Spiga

communicating-with-threads-in-php

Earlier this week, I suggested PHP could be multithreaded. The sample I provided was very simple and at least one reader quickly wondered how to communicate with threads.
If you haven’t already, take a look at part 1 to get some basic information about threads in PHP.
It took a bit longer to get that part working that simple threads, but as of now, I have a functional prototype of an HVAC thread.

HVAC.php


require "ThreadInstance.php";
class HVAC extends ThreadInstance {
        var $currentTemp;
        var $heaterOn;
        var $acOn;
        var $toggleOn;
        function HVAC () {
                $this->setup();
                $this->currentTemp = 32;
                $this->heaterOn = false;
                $this->acOn = false;
                $this->toggleOn = 70;
        }
        function runHeater() {
                $this->heaterOn = true;
                $this->acOn = false;
        }
        function runAc() {
                $this->heaterOn = false;
                $this->acOn = true;
        }
        function process() {
                switch (true) {
                        case ($this->heaterOn):
                                return $this->processHeater();
                        case ($this->acOn):
                                return $this->processAc();
                }
        }
        function processHeater() {
                if ($this->currentTemp < $this->toggleOn) {
                        $this->currentTemp++;
                }
        }
        function processAc() {
                if ($this->currentTemp > $this->toggleOn) {
                        $this->currentTemp–;
                }
        }
        function apploop($command) {
                $this->process();
                switch ($command) {
                        case “”:
                                // noop
                                return;
                        case “start heater”:
                                $this->runHeater();
                                $this->response (”ok”, NULL);
                                return;
                        case “start ac”:
                                $this->runAc();
                                $this->response(”ok”, NULL);
                                return;
                        case “get current temp”:
                                $this->response (”ok”, $this->currentTemp);
                                return;
                        case “set temp”:
                                $temp = intval($this->getLine(true));
                                if ($temp) {
                                        $this->toggleOn = $temp;
                                        $this->response (”ok”, NULL);
                                } else {
                                        $this->response (”err”, “not a number”);
                                }
                                return;
                        case “quit”:
                                exit;
                        default:
                                $this->response (”err”, “bad request - $command”);
                                return;
                }
        }
}
$hvac = new HVAC();
do {
        sleep (1);
        $hvac->apploop($hvac->getCommand());
} while (true);


ThreadInstance.php


set_time_limit (0);
require "ThreadUtility.php";
class ThreadInstance {
        var $stdin;
        var $commandbuffer;
        var $stdout;
        function setup() {
                $this->stdin = fopen ("php://stdin", "r");
                $this->stderr = fopen ("php://stderr", "w");
                stream_set_blocking ($this->stdin, false);
                $this->commandbuffer = (array)NULL;
                $this->outbuffer = "";
        }
        function getCommand() {
                $command = fgets ($this->stdin, 1024);
                $this->commandbuffer[] = $command;

                $command = array_shift ($this->commandbuffer);
                return trim($command);
        }
        function response ($status, $data) {
                response ($status, $data);
        }
        function getLine ($wait = false) {
                if ($wait) {
                        $buffer = "";
                        while (!strlen($buffer)) {
                                $buffer .= fgets ($this->stdin, 1024);
                        }
                } else {
                        $buffer = fgets ($this->stdin, 1024);
                }
                return trim($buffer);
        }
        function debug ($text) {
                fwrite ($this->stderr, $text);
        }
}


ThreadUtility.php


function response ($status, $response) {
        echo $status . "\n";
        echo base64_encode(serialize($response)), "\n";
}
function processresponse ($string) {
        $parts = explode ("\n", $string);
        $status = $parts[0];
        $data = unserialize (base64_decode ($parts[1]));
        return array ("status" => $status, "data" => $data);
}


Thread.php


require "ThreadUtility.php";
class Thread {
        var $pref ;
        var $pipes;
        var $pid;
        var $stdout;
        function Thread() {
                $this->pref = 0;
                $this->stdout = "";
                $this->pipes = (array)NULL;
        }
        function Create ($url) {
                $t = new Thread;
                $descriptor = array (0 => array ("pipe", "r"), 1 => array ("pipe", "w"), 2 => array ("pipe", "w"));
                $t->pref = proc_open ("php -q $url ", $descriptor, $t->pipes);
                stream_set_blocking ($t->pipes[1], 0);
                stream_set_blocking ($t->pipes[2], 0);
                usleep (10);
                return $t;
        }
        function isActive () {
                $this->stdout .= $this->listen();
                $f = stream_get_meta_data ($this->pipes[1]);
                return !$f["eof"];
        }
        function close () {
                $this->tell("quit");
                $r = proc_close ($this->pref);
                $this->pref = NULL;
                return $r;
        }
        function tell ($thought) {
                fwrite ($this->pipes[0], $thought . "\n");
                $response = "";
                do {
                        $response = $this->listen();
                } while ($response == "");
                return processresponse ($response);
        }
        function listen () {
                $buffer = $this->stdout;
                $this->stdout = "";
                while ($r = fgets ($this->pipes[1], 1024)) {
                        $buffer .= $r;
                }
                return $buffer;
        }
        function getError () {
                $buffer = "";
                while ($r = fgets ($this->pipes[2], 1024)) {
                        $buffer .= $r;
                }
                return $buffer;
        }
}


hvac.php


set_time_limit(0);
include "Thread.php";
$tHVAC = Thread::create("HVAC.php");
$r = $tHVAC->tell("start heater");
echo "Start heater: ", $r["status"], "\n";
if ($r["status"] == "err") {
        $tHVAC->close();
        exit;
}
$tHVAC->tell("set temp\n50");
$goingUp = true;
while ($tHVAC->isActive()) {
        echo $tHVAC->getError();
        $r = $tHVAC->tell("get current temp");
        if ($r["status"] == "ok") {
                echo "Current Temperature: ", $r["data"], "\n";
        }
        if ($r["data"] == 50 && $goingUp) {
                $tHVAC->tell("set temp\n35");
                $tHVAC->tell("start ac");
                $goingUp = false;
        } else if ($r["data"] == 35 && !$goingUp) {
                echo "Main Thread donenWaiting for HVAC Thread to end... ";
                $tHVAC->close();
                echo "ok";
                exit;
        }
}
Yes, it’s a lot of code for a sample.
In the original example, I assumed a file would have a given task. If you needed to generate a report while running a lengthy query, generate.php and analyze.php would make good threads. But real life is not that simple. So what I’ve now done is made a REPL loop in HVAC.php (the thread). This loop is what facilitates communications. It constantly polls for commands.
To understand this, let’s first ignore Thread.php, ThreadUtility.php and ThreadInstance.php. hvac.php is the stub application and HVAC.php is the HVAC unit and what we want threaded. REPL functionality is easily accomplished:

$hvac = new HVAC();
do {
sleep (1);
$hvac->apploop($hvac->getCommand());
} while (true);

sleep(1) is not strictly neccessary. It’s there to prevent real-time continuous polling. An initial delay is useful, but it doesn’t really need to be in the loop nor need it be so long. apploop is the controller for this thread. getCommand retrieves the oldest unprocessed command (typically, the only command) for to be executed.
Inside of apploop, logic is carried out like normal. In place of a clean exit, though, commands return response. This is an array comprised of a status and data. Status has two established values, err and ok. Data is a field allowing whatever data is useful to be passed back. It can be an object, however, the class must be defined in both contexts for it to be usable.
On the stub side, the thread is set up the same way as before. The first difference is that calls to tell return results. You will quickly know whether an operation succeeded without entering a listen poll loop.
One quirky statement is $tHVAC->tell("set temp\n50"); Commands are only one line. The set temp command is defined to prompt for a temperature, and it happens that the command issuing stream is the stream on which the temperature is polled. Therefore, we prime the command stream with a second statement (50) before it’s polled. This is not an ideal way to operate, but at this time, there is no more formalized architecture.
Finally, there’s a loop to set the temperature. With the heater turned on, the temperature is set to 50º. Once 50º is reached, the air conditioner is switched on and the temperature is lowered to 35º. Then the application exits.
isActive is not as accurate under this model. Since the thread is in a continuous loop, executon never truly stops. isActive will only stop if some error has taken down the thread. Therefore, one of the commands this thread supports is quit. Also, the general thread class has been altered to dispatch a quit instruction before it pulls the plug on the process.
The main application will not exit until all spawned threads exit. Therefore, it’s important that you’ve set up some remote-controllable mechanism to end it.

0 comments:

ShareThis