PHP 8.1 使用 Fiber 实现伪多线程操作

自从上次发了那篇《PHP8.1的多线程操作与单线程操作相关比较以及结论》之后,很多人来问我到底是怎么实现的,这次就来分享一下这个的 PHP 代码,看一下怎么使用 PHP 8.1 中的新特性 Fiber 实现伪多线程,也就是使用 Fiber 当作一个独立的线程。

入口文件 index.php

首先我们会引用两个 PHP 文件,分别是 async.phptClass.php 文件,分别是我们的为多线程执行的文件以及一个具体执行操作(function)的文件,然后我们这里会做一个比较,分别是用使用 Fiber 以及不使用的。

我们可以来看一下使用 Fiber 的 PHP 代码:

$test1 = new Fiber(
    function() {
        $requests = [];
        for($i = 1; $i <= 75; $i++) $requests[] = new tClass("abc", 123);

        $startTime = microtime(true);
        foreach($requests as $request) {
            $childFiber = new Fiber(function() use ($request){
                # Async::await($request->prepare(100000));
                print("准备好\n");
                $response = Async::await($request->fetch());
                print("执行完毕\n");
            });
            $childFiber->start();
        }

        Async::run();

        printf("全部任务多线程执行完成,执行耗时: %fs\n", microtime(true) - $startTime);
    }
);

$test1->start();

首先我们先设置好一个 Fiber,这个 Fiber 里面我们会先建立 100 个任务 Fiber 并且添加到同步执行 await 等候区中(详细可以看 async.php),然后初始执行一下(第14行),也就是先把我们任务的 Fiber 伪线程添加到 Fiber 伪线程池里面。

等到所有的 Fiber 任务都初始执行了之后我们就统一开始执行(第17行)。最后我们可以通过记录开始前和结束后的时间计算出执行用了多少时间(第19行)。当然到这里为止我们知识建立了一个 Fiber,我们最后还是需要启动一下这个Fiber的(第23行)

完成了 Fiber 的测试之后,我们来进行普通模式的测试,我们可以看一下具体 PHP 代码:

$startTime = microtime(true);
for($p = 1; $p <= 100; $p++) {
    # $file = "./test{$p}.txt";
    print("准备好\n");
    $time = microtime(true);
    while(microtime(true) <= $time + 1) {
        continue;
    }
    print("执行完毕\n");
}

printf("全部任务单线程执行完成,执行耗时: %fs\n", microtime(true) - $startTime);

同样我们也是建立 100 个任务,但是这次是单线程执行,这里也可以看到我们的执行程序,就是单纯的等一秒然后返回。这里没啥好说的,就是执行 100 次然后同样输出一下时间。

全部代码

<?php

require_once("./async.php");
require_once("./tClass.php");

print("PHP8.1下的异步操作与顺序操作的比较\n");

$test1 = new Fiber(
    function() {
        $requests = [];
        for($i = 1; $i <= 75; $i++) $requests[] = new tClass("abc", 123);

        $startTime = microtime(true);
        foreach($requests as $request) {
            $childFiber = new Fiber(function() use ($request){
                # Async::await($request->prepare(100000));
                print("准备好\n");
                $response = Async::await($request->fetch());
                print("执行完毕\n");
            });
            $childFiber->start();
        }

        Async::run();

        printf("全部任务多线程执行完成,执行耗时: %fs\n", microtime(true) - $startTime);
    }
);

$test1->start();

$startTime = microtime(true);
for($p = 1; $p <= 100; $p++) {
    # $file = "./test{$p}.txt";
    print("准备好\n");
    $time = microtime(true);
    while(microtime(true) <= $time + 1) {
        continue;
    }
    print("执行完毕\n");
}

printf("全部任务单线程执行完成,执行耗时: %fs\n", microtime(true) - $startTime);
?>

同步处理文件 async.php

首先我们先设立一个 Fiber 伪线程池 $activeAwaits ,然后我们可以看到有两个功能,一个是添加 Fiber 伪线程的,一个是执行所有 Fiber 伪线程的,我们可以先看看 await(Fiber $childFiber) 的具体 PHP 代码:

    public static function await(Fiber $childFiber): mixed {
        self::$activeAwaits[] = [Fiber::getCurrent(), $childFiber];
        $childFiber->start();
        while ($childFiber->isTerminated() === false) {
            $childFiber->resume();

            if(!$childFiber->isTerminated()) {
                Fiber::suspend();
            } else {
                break;
            }
        }

        return $childFiber->getReturn();
    }

首先我们先把父 Fiber 和我们现在传入的 Fiber 添加到 Fiber 伪线程池里面,这样我们之后执行的时候执行完这个传入的 Fiber 就能返回给我们的父 Fiber 了。添加了之后我们先初始运行一下 $childFiber,这里如果直接能够执行完毕没有调用 Fiber::suspend() 的话就直接返回了,否则的话就先放着等待之后统一执行(第8行,这个 Fiber::suspend() 是让父 Fiber 暂停的,不是让子 Fiber 暂停的,也就是子 Fiber 现在还在 $activeAwaits里面呆着)。

接下来我们来看一下run() 的具体 PHP 代码:

    public static function run(): void{
        while(count(self::$activeAwaits) > 0) {
            $toRemove = [];
            foreach(self::$activeAwaits as $index => $pair) {
                $parentFiber = $pair[0];
                $childFiber = $pair[1];

                if($parentFiber->isSuspended() && $parentFiber->isTerminated() === false) {
                    $parentFiber->resume();
                } elseif ($parentFiber->isTerminated()) {
                    $toRemove[] = $index;
                }
            }

            foreach($toRemove as $indexToRemove) {
                unset(self::$activeAwaits[$indexToRemove]);
            }

            self::$activeAwaits = array_values(self::$activeAwaits);
        }
    }

这里就是只要我们的 Fiber 伪线程池 $activeAwaits 里面还有线程,我们就不断执行。首先我们要定义一个要移除的 Fiber 的阵列,方便等下移除已经执行完毕的 Fiber 。然后我们就一个一个的读 Fiber 伪线程池 $activeAwaits 里面的 Fiber 伪线程然后运行他们,如果父 Fiber 伪线程被暂停的话就继续执行,也就是继续执行上面 run() 的 PHP 代码:

        while ($childFiber->isTerminated() === false) {
            $childFiber->resume();

            if(!$childFiber->isTerminated()) {
                Fiber::suspend();
            } else {
                break;
            }
        }

可以看到这个父 Fiber 伪线程是在不断的暂停然后继续的,所以这就是为什么说这是一个伪多线程的实现,不是真正的多线程,就是因为这个依旧是单独线程,只是执行的时候没有真正的等待,因为我们创建 Fiber 的时候已经设置好了开始等待的时间(也就是下面 tClass.php 的内容),然后把 Fiber 暂停了,但是时间还是一样的,所以这里其实是 100 个 Fiber 集体等待 1 秒钟而不是每个 Fiber 都单独等了一秒钟,也就是说这 1 秒里面是在 100 个 Fiber 里面不断切换执行的,而不是 100 个 Fiber 同时执行的。

全部代码

<?php

class Async {
    public static array $activeAwaits = [];

    public static function await(Fiber $childFiber): mixed {
        self::$activeAwaits[] = [Fiber::getCurrent(), $childFiber];
        $childFiber->start();
        while ($childFiber->isTerminated() === false) {
            $childFiber->resume();

            if(!$childFiber->isTerminated()) {
                Fiber::suspend();
            } else {
                break;
            }
        }

        return $childFiber->getReturn();
    }

    public static function run(): void{
        while(count(self::$activeAwaits) > 0) {
            $toRemove = [];
            foreach(self::$activeAwaits as $index => $pair) {
                $parentFiber = $pair[0];
                $childFiber = $pair[1];

                if($parentFiber->isSuspended() && $parentFiber->isTerminated() === false) {
                    $parentFiber->resume();
                } elseif ($parentFiber->isTerminated()) {
                    $toRemove[] = $index;
                }
            }

            foreach($toRemove as $indexToRemove) {
                unset(self::$activeAwaits[$indexToRemove]);
            }

            self::$activeAwaits = array_values(self::$activeAwaits);
        }
    }
}

?>

具体执行文件 tClass.php

这个其实就跟我们单线程执行的那个很像,我们直接上全部的 PHP 代码:

<?php

class tClass {

    public function __construct(private string $file, private int $cnt){}

    public function fetch() {
        return new \Fiber(
            function () {
                Fiber::suspend();
                $time = microtime(true);
                while(microtime(true) <= $time + 1) {
                    Fiber::suspend();
                    echo "继续执行\n";
                    continue;
                }
                return true;
            }
        );
    }
}

?>

唯一一点不一样的就是我们需要添加 Fiber::suspend() ,但是我们可以看到我们在执行等待的时候,也是要执行 Fiber::suspend() 的,所以其实当我们执行真正需要大量等待操作的时候(例如 curl 远程获取网页),还是会遇到需要等待的清空,这也是为啥这只是一个伪多线程。

具体执行

至于要执行上面的代码,其实我们只需要在 Linux SSH里面执行下面的代码就可以看到了:

php index.php