Skip to content

Commit

Permalink
Callback run (#17)
Browse files Browse the repository at this point in the history
* add a CallbackRun runner

* increase coverage

* fix lint

* remove type comparison between float and int
  • Loading branch information
Harry Bragg authored Aug 30, 2018
1 parent 6253338 commit 1e723ef
Show file tree
Hide file tree
Showing 6 changed files with 396 additions and 11 deletions.
208 changes: 208 additions & 0 deletions src/CallbackRun.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
<?php
/**
* This file is part of graze/parallel-process.
*
* Copyright © 2018 Nature Delivered Ltd. <https://www.graze.com>
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*
* @license https://github.com/graze/parallel-process/blob/master/LICENSE.md
* @link https://github.com/graze/parallel-process
*/

namespace Graze\ParallelProcess;

use Exception;
use Graze\ParallelProcess\Event\EventDispatcherTrait;
use Graze\ParallelProcess\Event\RunEvent;
use Throwable;

class CallbackRun implements RunInterface, OutputterInterface
{
use EventDispatcherTrait;

/** @var callable */
private $callback;
/** @var float */
private $started = 0;
/** @var float */
private $finished = 0;
/** @var bool */
private $successful = false;
/** @var string[] */
private $tags;
/** @var Exception|null */
private $exception = null;
/** @var string */
private $last;

/**
* Run constructor.
*
* @param callable $callback A callback to run, if this returns a string, it can be accessed from the
* `->getLastMessage()` calls
* @param string[] $tags List of key value tags associated with this run
*/
public function __construct(callable $callback, array $tags = [])
{
$this->callback = $callback;
$this->tags = $tags;
}

/**
* @return string[]
*/
protected function getEventNames()
{
return [
RunEvent::STARTED,
RunEvent::COMPLETED,
RunEvent::FAILED,
RunEvent::UPDATED,
];
}

/**
* Start the process
*
* @return $this
*/
public function start()
{
if ($this->started == 0) {
$this->started = microtime(true);
$this->dispatch(RunEvent::STARTED, new RunEvent($this));
try {
$output = call_user_func($this->callback);
$this->handleOutput($output);
$this->finished = microtime(true);
$this->successful = true;
$this->dispatch(RunEvent::COMPLETED, new RunEvent($this));
} catch (Exception $e) {
$this->finished = microtime(true);
$this->successful = false;
$this->exception = $e;
$this->dispatch(RunEvent::FAILED, new RunEvent($this));
}
}

return $this;
}

/**
* @param mixed $output The output from the callback, if you want to send this back, return a string|string[]
*/
private function handleOutput($output)
{
if (is_string($output)) {
$output = explode("\n", $output);
}
if (is_array($output)) {
foreach ($output as $line) {
if (is_string($line)) {
$line = rtrim($line);
if (mb_strlen($line) > 0) {
$this->last = $line;
$this->dispatch(RunEvent::UPDATED, new RunEvent($this));
}
}
}
}
}

/**
* Poll to see if the callback is still running (hint: it is not)
*
* @return bool
*/
public function poll()
{
// non async process, so it will have finished when calling start
return false;
}

/**
* Return if the underlying process is running
*
* @return bool
*/
public function isRunning()
{
return false;
}

/**
* @return bool
*/
public function isSuccessful()
{
return $this->successful;
}

/**
* @return bool
*/
public function hasStarted()
{
return $this->started > 0;
}

/**
* @return array
*/
public function getTags()
{
return $this->tags;
}

/**
* @return float number of seconds this run has been running for (0 for not started)
*/
public function getDuration()
{
if ($this->finished > 0) {
return $this->finished - $this->started;
}
return $this->started > 0 ? microtime(true) - $this->started : 0;
}

/**
* @return float[]|null the process between 0 and 1 if the run supports it, otherwise null
*/
public function getProgress()
{
return null;
}

/**
* If the run was unsuccessful, get the error if applicable
*
* @return Exception[]|Throwable[]
*/
public function getExceptions()
{
if ($this->exception !== null) {
return [$this->exception];
}
return [];
}

/**
* Get the last message that this thing produced
*
* @return string
*/
public function getLastMessage()
{
return $this->last;
}

/**
* @return string
*/
public function getLastMessageType()
{
return '';
}
}
4 changes: 2 additions & 2 deletions src/Lines.php
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ public function add(RunInterface $run)
function (RunEvent $event) use ($index) {
$run = $event->getRun();
$message = '';
if ($run instanceof Run) {
$message = ($this->showType && $run instanceof Run)
if ($run instanceof OutputterInterface) {
$message = ($this->showType && $run->getLastMessageType() !== '')
? sprintf('(%s) %s', $run->getLastMessageType(), $run->getLastMessage())
: $run->getLastMessage();
}
Expand Down
18 changes: 18 additions & 0 deletions src/OutputterInterface.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
<?php

namespace Graze\ParallelProcess;

interface OutputterInterface
{
/**
* Get the last message that this thing produced
*
* @return string
*/
public function getLastMessage();

/**
* @return string
*/
public function getLastMessageType();
}
12 changes: 4 additions & 8 deletions src/Run.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,10 @@
use Symfony\Component\Process\Process;
use Throwable;

class Run implements RunInterface
class Run implements RunInterface, OutputterInterface
{
use EventDispatcherTrait;

const ON_SUCCESS = 1;
const ON_FAILURE = 2;
const ON_PROGRESS = 3;

/** @var Process */
private $process;
/** @var float */
Expand All @@ -44,14 +40,14 @@ class Run implements RunInterface
private $updateOnPoll = true;
/** @var bool */
private $updateOnProcessOutput = true;
/** @var array */
/** @var string[] */
private $tags;

/**
* Run constructor.
*
* @param Process $process
* @param array $tags List of key value tags associated with this run
* @param Process $process
* @param string[] $tags List of key value tags associated with this run
*/
public function __construct(Process $process, array $tags = [])
{
Expand Down
2 changes: 1 addition & 1 deletion src/Table.php
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ function (PoolRunEvent $event) {
private function formatRow(RunInterface $run, $status)
{
$tags = $this->formatTags($run->getTags());
$extra = ($this->showOutput && $run instanceof Run && mb_strlen($run->getLastMessage()) > 0)
$extra = ($this->showOutput && $run instanceof OutputterInterface && mb_strlen($run->getLastMessage()) > 0)
? ' ' . $this->terminal->filter($run->getLastMessage())
: '';
return sprintf("%s (<comment>%6.2fs</comment>) %s%s", $tags, $run->getDuration(), $status, $extra);
Expand Down
Loading

0 comments on commit 1e723ef

Please sign in to comment.