Scripts
Streams lib
Kleine library om non-blocking streams te maken over tcp. Handig als je meerdere verbindingen wil maken of niet wil wachten opdat een verbinding klaar is met connecten/lezen/schrijven. Losjes gebaseerd op Node.js wat betreft het callbacks idee. Voorbeelden qua gebruik kun je vinden op: https://github.com/Pannekoek/Streams/wiki/_pages
Connection.php
<?php
/**
* Streams library
* Copyright (c) 2012, Remco Pander
*
* This library consists of a set of classes that can be used to create high-performing
* applications using full-duplex connections. It uses the built-in stream functions
* provided with php.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* @package Streams
* @author Remco Pander <[email protected]>
* @version v1 initial release
* @copyright Copyright (c) 2012, Remco Pander
*/
namespace Streams;
use \LogicException;
use \InvalidArgumentException;
class Connection
{
// Global constants
const CERT_FILE_PATH = './';
const CONN_MAX_TIME = 5;
// Connection details
protected $r_Socket = null;
protected $i_Status = 0;
private $s_rAddress = null;
private $s_rPort = null;
private $b_Secure = false;
private $s_bindTo = null;
private $i_timeConnected = 0;
// Buffers for data
private $readBuffer = null;
private $writeBuffer = null;
// Callbacks
public $onConnect = null;
public $onTerminate = null;
public $onRead = null;
public $onWrite = null;
public $onError = null;
// Create a new connection
public function __construct ($s_Address, $i_Port, $s_bindAddress = null, $b_Secure = false)
{
// Resolve address
if (!@gethostbyname ($s_Address) && !@gethostbyaddr ($s_Address) || $i_Port > 65535 || $i_Port < 2)
{
throw new InvalidArgumentException ('Address "' . $s_Address . ':' . $i_Port . '" could not be resolved or is invalid.');
}
$this -> s_rAddress = $s_Address;
$this -> i_rPort = $i_Port;
$this -> s_bindTo = $s_bindAddress;
$this -> b_Secure = $b_Secure;
if ($this -> b_Secure)
{
if (!extension_loaded ('openssl'))
{
throw new LogicException ('Cannot create secure connection withouth the OpenSSL extension loaded.');
}
if (!file_exists (self :: CERT_FILE_PATH))
{
throw new LogicException ('Cannot create secure connection withouth a certificate file.');
}
}
// Create buffers
$this -> readBuffer = new StringBuffer ();
$this -> writeBuffer = new StringBuffer ();
}
// Destructor, closes the connection if it is still open
public function __destruct ()
{
// Check if we are connected.
if ($this -> isConnected ())
{
$this -> disconnect ();
} else
{
$this -> _forceClose ();
}
// Empty buffer
$this -> readBuffer = null;
$this -> writeBuffer = null;
}
public function __get ($s_Member)
{
switch ($s_Member)
{
case 'readBuffer':
return $this -> readBuffer;
case 'writeBuffer':
return $this -> writeBuffer;
default:
throw new LogicException ('Acces to "' . $s_Member . '" is not allowed.');
}
}
// Returns address
public function __toString ()
{
return $this -> s_rAddress . ':' . $this -> i_rPort;
}
// Returns the time we connected
public function getTimeConnected ()
{
return $this -> i_timeConnected;
}
// Returns the current status
public function getStatus ()
{
return $this -> i_Status;
}
// Returns whether the socket is connected or not
public function isConnected ()
{
return ($this -> i_Status == Status :: CONNECTED);
}
// Returns whether this socket is using a secure connection
public function isSecure ()
{
return $this -> b_Secure;
}
// Checks if the socket has data pending to write
public function hasWriteBuffer ()
{
return $this -> writeBuffer -> hasBuffer ();
}
// Checks if the socket has data pending in its read buffer
public function hasReadBuffer ()
{
return $this -> readBuffer -> hasBuffer ();
}
// Queries the remote machine to get it's address
public function getRemoteAddress ()
{
if (!$this -> isConnected ())
{
throw new LogicException ('Cannot receive remote address on socket that is not connected.');
}
return stream_socket_get_name ($this -> r_Socket, true);
}
// Queries the local machine to get it's address
public function getLocalAddress ()
{
if (!$this -> isConnected ())
{
throw new LogicException ('Cannot receive local address on socket that is not connected.');
}
return stream_socket_get_name ($this -> r_Socket, false);
}
// Sets the encoding for the stream
public function setEncoding ($s_Encoding)
{
if (!function_exists ('mb_list_encodings') || !in_array ($s_Encoding, mb_list_encodings ()))
{
return false;
}
return stream_encoding ($this -> r_Socket, $s_Encoding);
}
// Sets the internal buffer's line endings to something else
public function setLineEnding ($s_lineEnd)
{
$this -> readBuffer -> setLineEnding ($s_lineEnd);
$this -> writeBuffer -> setLineEnding ($s_lineEnd);
return true;
}
// Connects to the remote machine
public function connect ()
{
if ($this -> isConnected ())
{
return true;
}
if ($this -> i_Status == Status :: CONNECTING)
{
if (!feof ($this -> r_Socket))
{
$this -> i_Status = Status :: CONNECTED;
$this -> i_timeConnected = time ();
$this -> _doCallback ('onConnect', array ($this, $this -> i_timeConnected));
return true;
} else /** call from poller that connection failed **/
{
$this -> i_Status = Status :: ERROR;
$this -> _doCallback ('onError', array ($this, Error :: CONNFAILED));
return false;
}
}
// Prepare context
$a_Opts = array ();
$s_Scheme = 'tcp://';
// Set options for ssl connection
if ($this -> b_Secure)
{
$s_Scheme = 'ssl://';
$a_Opts ['ssl'] = array ('verify_peer' => false,
'allow_self_signed' => true,
'cert_file' => self :: CERT_FILE_PATH,
'passphrase' => ''
);
}
// Set bind address
if (!empty ($this -> s_bindTo))
{
$a_Opts ['socket'] = array ('bindto' => $this -> s_bindTo);
}
// Now, create the actual resource.
$i_Flags = STREAM_CLIENT_CONNECT | STREAM_CLIENT_ASYNC_CONNECT;
$r_Context = stream_context_create ($a_Opts);
$this -> r_Socket = stream_socket_client ($s_Scheme . $this -> s_rAddress . ':' . $this -> i_rPort, $i_Errno, $s_Errstr, null, $i_Flags, $r_Context);
// Check if we succeeded
if (!is_resource ($this -> r_Socket))
{
$this -> i_Status = Status :: ERROR;
$this -> _doCallback ('onError', array ($this, Error :: INITFAILED));
return false;
}
// Change blocking status.
stream_set_blocking ($this -> r_Socket, false);
$this -> i_Status = Status :: CONNECTING;
// Insert to our poller object
Poller :: getInstance () -> addConnection ($this -> r_Socket, $this);
return false;
}
// Terminates the connection to the remote machine
public function disconnect ()
{
if (!$this -> isConnected ())
{
return true;
}
// Check if there is data pending..
if ($this -> writeBuffer -> hasBuffer ())
{
// Bypass write () because we do not want to send any callbacks anymore
// and do not care for errors
if (($i_Written = @fwrite ($this -> r_Socket, $this -> writeBuffer -> getBuffer ())) !== false)
{
$this -> writeBuffer -> removeLength ($i_Written);
}
}
// Close it.
$this -> _forceClose ();
$this -> i_Status = Status :: DISCONNECTED_LOCAL;
$this -> _doCallback ('onTerminate', array ($this, Status :: DISCONNECTED_LOCAL));
return true;
}
// Attempts to write data from the write buffer to the remote machine
// Returns amount of bytes written or false
public function write ()
{
if (!$this -> isConnected ())
{
throw new LogicException ('Cannot perform write on socket that is not connected.');
}
if (($i_Written = fwrite ($this -> r_Socket, $this -> writeBuffer -> getBuffer ())) === false)
{
$this -> i_Status = Status :: ERROR;
$this -> _forceClose ();
$this -> _doCallback ('onError', array ($this, Error :: WRITEERR));
return false;
}
$this -> writeBuffer -> removeLength ($i_Written);
$this -> _doCallback ('onWrite', array ($this, $this -> writeBuffer, $i_Written));
return true;
}
// Read as much data from the stream as possible
// Returns amount of bytes written or false
public function read ()
{
if (!$this -> isConnected ())
{
throw new LogicException ('Cannot perform read on socket that is not connected.');
}
// Check for end-of-file
if (feof ($this -> r_Socket))
{
$this -> _forceClose ();
$this -> i_Status = Status :: DISCONNECTED_REMOTE;
$this -> _doCallback ('onTerminate', array ($this, Status :: DISCONNECTED_REMOTE));
return false;
}
// Perform read action..
if (($s_readString = fread ($this -> r_Socket, 8192)) == false)
{
$this -> i_Status = Status :: ERROR;
$this -> _forceClose ();
$this -> _doCallback ('onError', array ($this, Error :: READERR));
return false;
}
// Append to read buffer
$this -> readBuffer -> appendBuffer ($s_readString);
$this -> _doCallback ('onRead', array ($this, $this -> readBuffer, strlen ($s_readString)));
return true;
}
// Private function to close the resource
private function _forceClose ()
{
if (!is_resource ($this -> r_Socket))
{
return;
}
// Close the socket resource
stream_socket_shutdown ($this -> r_Socket, STREAM_SHUT_RDWR);
fclose ($this -> r_Socket);
$this -> r_Socket = null;
$this -> i_timeConnected = 0;
// Remove from poller
Poller :: getInstance () -> removeConnection ($this);
}
// Attempts to callback to someone
private function _doCallback ($s_Event, array $a_Args = array ())
{
if (isset ($this -> $s_Event) && is_callable ($this -> $s_Event))
{
call_user_func_array ($this -> $s_Event, $a_Args);
}
return;
}
}
Listening.php
<?php
/**
* Streams library
* Copyright (c) 2012, Remco Pander
*
* This library consists of a set of classes that can be used to create high-performing
* applications using full-duplex connections. It uses the built-in stream functions
* provided with php.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* @package Streams
* @author Remco Pander <[email protected]>
* @version v1 initial release
* @copyright Copyright (c) 2012, Remco Pander
*/
namespace Streams;
use \LogicException;
use \InvalidArgumentException;
class Listening
{
// Global constants
const CERT_FILE_PATH ='./';
// Stream details
private $r_Socket = null;
private $i_Status = 0;
private $s_lAddress = null;
private $i_lPort = 0;
private $i_listenStart = 0;
private $i_Accepted = 0;
private $b_Secure = false;
// Callbacks
public $onAccept = null;
public $onTerminate = null;
public $onError = null;
public function __construct ($i_Port = 0, $s_Address = null, $b_Secure = false)
{
if ($b_Secure)
{
if (!extension_loaded ('openssl'))
{
throw new LogicException ('Cannot create secure connection withouth the OpenSSL extension loaded.');
}
if (!file_exists (self :: CERT_FILE_PATH))
{
throw new LogicException ('Cannot create secure connection withouth a certificate file.');
}
}
$this -> i_lPort = $i_Port;
$this -> s_lAddress = null;
$this -> b_Secure = $b_Secure;
}
public function isListening ()
{
return ($this -> i_Status == Status :: LISTENING);
}
public function getStatus ()
{
return $this -> i_Status;
}
public function getAcceptedCount ()
{
return $this -> i_Accepted;
}
public function getListenStarted ()
{
return $this -> i_listenStart;
}
public function getListenPort ()
{
return $this -> i_lPort;
}
public function getLocalAddress ()
{
if (!$this -> isListening ())
{
throw new LogicException ('Cannot retrieve local address on stream that is not listening.');
}
return stream_socket_get_name ($this -> r_Socket, false);
}
// Start listening on the given port and address
public function listen ()
{
if ($this -> isListening ())
{
return true;
}
$s_Scheme = 'tcp://';
$a_Opts = array ();
// Set options for ssl connection
if ($this -> b_Secure)
{
$s_Scheme = 'ssl://';
$a_Opts ['ssl'] = array ('verify_peer' => false,
'allow_self_signed' => true,
'cert_file' => self :: CERT_FILE_PATH,
'passphrase' => ''
);
}
$i_Flags = STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
$r_Context = stream_context_create ($a_Opts);
$this -> r_Socket = stream_socket_server ($s_Scheme . $this -> s_lAddress . ':' . $this -> i_lPort, $i_Errno, $s_Errstr, $i_Flags, $r_Context);
// Check if we succeeded
if (!is_resource ($this -> r_Socket))
{
$this -> i_Status = Status :: ERROR;
$this -> _doCallback ('onError', array ($this, Error :: INITFAILED));
return false;
}
// Change blocking status
stream_set_blocking ($this -> r_Socket, false);
$this -> i_Status = Status :: LISTENING;
$this -> i_listenStart = time ();
// Fetch the port on which we are listening now..
if ($this -> i_lPort == 0)
{
$this -> i_lPort = substr ($this -> getLocalAddress (), strrpos ($this -> getLocalAddress (), ':'));
}
// Add to poller
Poller :: getInstance () -> addConnection ($this -> r_Socket, $this);
return true;
}
// Stop listening
public function terminate ()
{
if (!$this -> isListening ())
{
return true;
}
// Close stream
$this -> _forceClose ();
$this -> i_Status = Status :: DISCONNECTED_LOCAL;
$this -> i_listenStart = 0;
$this -> _doCallback ('onTerminate', array ($this));
return true;
}
// Accept an incoming connection and pass it on to the callback function
public function accept ()
{
if (!$this -> isListening ())
{
throw new RuntimeException ('Cannot accept on a stream that is not listening.');
}
// Fetch the resource
if (($r_Socket = stream_socket_accept ($this -> r_Socket, 0, $s_Address)) == false)
{
$this -> _doCallback ('onError', array ($this, Error :: ACCEPTERR));
return false;
}
$o_Socket = new Accept ($r_Socket, $s_Address, $this -> b_Secure);
$this -> i_Accepted++;
// Callback,
// if it has none, disconnect the accepted connection and remove it.
if (isset ($this -> $s_Event) && is_callable ($this -> $s_Event))
{
Poller :: getInstance () -> addConnection ($r_Socket, $o_Socket);
$this -> _doCallback ('onAccept', array ($this, $o_Socket));
} else
{
$o_Socket -> disconnect ();
unset ($o_Socket);
}
return true;
}
// Private function to close the resource
private function _forceClose ()
{
if (!is_resource ($this -> r_Socket))
{
return;
}
// Close the socket resource
stream_socket_shutdown ($this -> r_Socket, STREAM_SHUT_RDWR);
fclose ($this -> r_Socket);
$this -> r_Socket = null;
$this -> i_listenStart = 0;
// Remove from poller
Poller :: getInstance () -> removeConnection ($this);
}
// Attempts to callback to someone
private function _doCallback ($s_Event, array $a_Args = array ())
{
if (isset ($this -> $s_Event) && is_callable ($this -> $s_Event))
{
call_user_func_array ($this -> $s_Event, $a_Args);
}
return;
}
}
Poller.php
<?php
/**
* Streams library
* Copyright (c) 2012, Remco Pander
*
* This library consists of a set of classes that can be used to create high-performing
* applications using full-duplex connections. It uses the built-in stream functions
* provided with php.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* @package Streams
* @author Remco Pander <[email protected]>
* @version v1 initial release
* @copyright Copyright (c) 2012, Remco Pander
*/
namespace Streams;
use \InvalidArgumentException;
use \RuntimeException;
final class Poller
{
// Instance of the poller
private static $_instance = null;
// List of all connections to monitor
private $a_connList = array ();
private $a_connSocks = array ();
// Returns instance of this class
public static function getInstance ()
{
if (empty (self :: $_instance))
{
self :: $_instance = new Poller ();
}
return self :: $_instance;
}
// Allow no creation of this class
private function __construct ()
{
}
public function hasConnection (Connection $o_Conn)
{
return in_array ($o_Conn, $this -> a_connList);
}
public function getCount ()
{
return count ($this -> a_connList);
}
public function addConnection ($r_Socket, $o_Conn)
{
if (!$o_Conn instanceof Connection && !$o_Conn instanceof Listening)
{
throw new InvalidArgumentException ('Poller :: addConnection () expects parameter 2 to be an instance of Connection or Listening.');
} else if (!is_resource ($r_Socket))
{
throw new InvalidArgumentException ('Poller :: addConnection () expects parameter 1 to be a valid resource.');
} else if (in_array ($o_Conn, $this -> a_connList))
{
return true;
}
$i_Intval = intval ($r_Socket);
$this -> a_connList [$i_Intval] = $o_Conn;
$this -> a_connSocks [$i_Intval] = $r_Socket;
return true;
}
public function removeConnection ($o_Conn)
{
if (!$o_Conn instanceof Connection && !$o_Conn instanceof Listening)
{
throw new InvalidArgumentException ('Poller :: addConnection () expects parameter 2 to be an instance of Connection or Listening.');
}
if (!in_array ($o_Conn, $this -> a_connList))
{
return true;
}
$i_Intval = array_search ($o_Conn, $this -> a_connList, true);
unset ($this -> a_connList [$i_Intval]);
unset ($this -> a_connSocks [$i_Intval]);
return true;
}
public function pollConnections ($f_blockTime = 0)
{
if ($this -> getCount () < 1)
{
return false;
}
// Calculate time-out
if (!is_float ($f_blockTime) && !is_int ($f_blockTime))
{
$i_Seconds = null;
$i_mSeconds = null;
} else
{
$i_Seconds = floor ($f_blockTime);
$i_mSeconds = (($f_blockTime - $i_Seconds) * 1e6);
}
// Create fd sets
$a_readSock = $a_writeSocks = $a_exceptSocks = array ();
foreach ($this -> a_connList as $i_Intval => $o_Connection)
{
$r_Socket = $this -> a_connSocks [$i_Intval];
switch ($o_Connection -> getStatus ())
{
case Status :: CONNECTING:
$a_writeSocks [] = $r_Socket;
break;
case Status :: CONNECTED:
$a_readSocks [] = $r_Socket;
$a_exceptSocks [] = $r_Socket;
if ($o_Connection -> hasWriteBuffer ())
{
$a_writeSocks [] = $r_Socket;
}
break;
case Status :: LISTENING:
$a_readSocks [] = $r_Socket;
break;
}
}
// Here comes the magic part
if (($i_Changed = stream_select ($a_readSocks, $a_writeSocks, $a_exceptSocks, $i_Seconds, $i_mSeconds)) === false)
{
throw new RuntimeException ('Fatal error occurred while performing stream_select ().');
} else if ($i_Changed < 1)
{
return false;
}
/**
* Readable sockets are sockets that:
* 1. listening for connections with an incoming connection
* 2. data is avaliable to be read
* 3. end-of-file has been reached
*/
if (count ($a_readSocks) > 0)
{
foreach ($a_readSocks as $r_Socket)
{
$o_Connection = $this -> a_connList [intval ($r_Socket)];
switch ($o_Connection -> getStatus ())
{
case Status :: LISTENING:
$o_Connection -> accept ();
break;
case Status :: CONNECTED:
$o_Connection -> read ();
break;
}
}
}
/**
* Writable sockets are sockets that:
* 1. sockets that were connecting and now are connected
* 2. data can be written
*/
if (count ($a_writeSocks) > 0)
{
foreach ($a_writeSocks as $r_Socket)
{
$o_Connection = $this -> a_connList [intval ($r_Socket)];
switch ($o_Connection -> getStatus ())
{
case Status :: CONNECTING:
$o_Connection -> connect ();
break;
case Status :: CONNECTED:
$o_Connection -> write ();
break;
}
}
}
/**
* Except sockets are sockets that:
* 1. sockets that were connecting and it failed
* 2. Out-Of-Band data can be read
*/
if (count ($a_exceptSocks) > 0)
{
foreach ($a_exceptSocks as $r_Socket)
{
$o_Connection = $this -> a_connList [intval ($r_Socket)];
switch ($o_Connection -> getStatus ())
{
case Status :: CONNECTING:
$o_Connection -> connect ();
break;
case Status :: CONNECTED:
// @todo
break;
}
}
}
return true;
}
}
HttpRequest.php
<?php
/**
* Streams library
* Copyright (c) 2012, Remco Pander
*
* This library consists of a set of classes that can be used to create high-performing
* applications using full-duplex connections. It uses the built-in stream functions
* provided with php.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* @package Streams
* @author Remco Pander <[email protected]>
* @version v1 initial release
* @copyright Copyright (c) 2012, Remco Pander
*/
namespace Streams;
use \LogicException;
use \InvalidArgumentException;
class HttpRequest
{
private $o_Connection = null;
private $a_Headers = null;
private $s_Body = null;
private $s_Site = null;
private $s_Page = null;
public $onSuccess = null;
public $onFailure = null;
public static function doGet ($s_Site, $s_Page = '')
{
$conn = new HttpRequest ($s_Site, $s_Page);
// Fill headers..
$conn -> a_Headers [] = "GET /" . $s_Page . " HTTP/1.1\r\n";
$conn -> a_Headers [] = "Host: " . $s_Site . "\r\n";
$conn -> a_Headers [] = "Connection: close\r\n";
$conn -> a_Headers [] = "\r\n";
// Return object
return $conn;
}
public static function doPost ($s_Site, $s_Page = '/', array $a_Fields = array ())
{
if (count ($a_Fields) < 1)
{
return self :: doGet ($s_Site, $s_Page);
}
$conn = new HttpRequest ($s_Site, $s_Page);
$s_Fields = http_build_query ($a_Fields);
// Fill headers
$conn -> a_Headers [] = "POST /" . $s_Page . " HTTP/1.1";
$conn -> a_Headers [] = "Host: " . $s_Site;
$conn -> a_Headers [] = "Connection: close";
$conn -> a_Headers [] = "Content-Type: application/x-www-form-urlencoded";
$conn -> a_Headers [] = "Content-Length: " . strlen ($s_Fields);
$conn -> s_Body = $s_Fields;
// Return object
return $conn;
}
public function __construct ($s_Site, $s_Page)
{
$this -> s_Site = $s_Site;
$this -> s_Page = $s_Page;
// Create connection and set callbacks.
$this -> o_Connection = new Connection ($s_Site, 80);
$this -> o_Connection -> onConnect = array ($this, 'onConnect');
$this -> o_Connection -> onTerminate = array ($this, 'onTerminate');
$this -> o_Connection -> onError = array ($this, 'onError');
}
public function __destruct ()
{
$this -> o_Connection = null;
}
public function execute ()
{
return $this -> o_Connection -> connect ();
}
public function onConnect ($o_Connection, $i_connTime)
{
// Build headers
$s_Headers = implode ("\r\n", $this -> a_Headers);
$s_Headers .= "\r\n";
if (!empty ($this -> s_Body))
{
$s_Headers .= $this -> s_Body;
}
// Send headers
$o_Connection -> writeBuffer -> appendBuffer ($s_Headers);
return true;
}
public function onError ($o_Connection, $i_Error)
{
if (isset ($this -> onFailure) && is_callable ($this -> onFailure))
{
call_user_func_array ($this -> onFailure, array ($this, $i_Error));
}
}
public function onTerminate ($o_Connection, $i_whoClosed)
{
// Request finished, filter headers from response.
$s_httpResponse = $o_Connection -> readBuffer -> getBuffer ();
$a_httpResponse = explode ("\r\n\r\n", $s_httpResponse);
$a_httpResponse = array ('headers' => $a_httpResponse [0],
'body' => $a_httpResponse [1]);
if (isset ($this -> onSuccess) && is_callable ($this -> onSuccess))
{
call_user_func_array ($this -> onSuccess, array ($this, $a_httpResponse));
}
return true;
}
}
StringBuffer.php
<?php
/**
* Streams library
* Copyright (c) 2012, Remco Pander
*
* This library consists of a set of classes that can be used to create high-performing
* applications using full-duplex connections. It uses the built-in stream functions
* provided with php.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* @package Streams
* @author Remco Pander <[email protected]>
* @version v1 initial release
* @copyright Copyright (c) 2012, Remco Pander
*/
namespace Streams;
class StringBuffer
{
// Buffer
private $s_Buffer = null;
private $i_Length = 0;
private $s_lineEnd = "\n";
public function setLineEnding ($s_newEnding)
{
$this -> s_lineEnd = (string) $s_newEnding;
}
// (string) StringBuffer
public function __toString ()
{
return $this -> s_Buffer;
}
public function toString ()
{
return $this -> s_Buffer;
}
public function toArray ()
{
return $this -> getLines ();
}
// Adds one or more characters to the buffer
public function appendBuffer ($s_String)
{
$this -> s_Buffer .= (string) $s_String;
$this -> i_Length += strlen ($s_String);
return true;
}
public function prependBuffer ($s_String)
{
$this -> s_Buffer = $s_String . $this -> s_Buffer;
$this -> i_Length += strlen ($s_String);
return true;
}
public function hasBuffer ()
{
return ($this -> i_Length > 0);
}
public function hasLine ()
{
return stripos ($this -> s_Buffer, $this -> s_lineEnd);
}
public function getSize ()
{
return $this -> i_Length;
}
// Removes x amount of bytes from the beginning or end of the buffer
public function removeLength ($i_Length, $b_Prepend = true)
{
if ($i_Length < 0)
{
throw new InvalidArgumentException ('StringBuffer :: removeLength () expects parameter 1 to be positive.');
} else if ($i_Length > $this -> i_Length)
{
$this -> s_Buffer = null;
$this -> i_Length = 0;
return true;
}
// Remove from beginning or end?
if ($b_Prepend)
{
$this -> s_Buffer = substr ($this -> s_Buffer, $i_Length);
$this -> i_Length -= $i_Length;
} else
{
$this -> s_Buffer = substr ($this -> s_Buffer, 0, $i_Length);
$this -> i_Length -= $i_Length;
}
return true;
}
// Returns all of the buffer.
public function getBuffer ()
{
return $this -> s_Buffer;
}
// Clears buffer
public function flushBuffer ()
{
$this -> s_Buffer = null;
$this -> i_Lenght = 0;
return true;
}
// Returns one line from the buffer
// or false if it has none.
public function getLine ()
{
if (!$this -> hasLine ())
{
return false;
}
$i_Offset = stripos ($this -> s_Buffer, $this -> s_lineEnd);
$s_theLine = substr ($this -> s_Buffer, 0, $i_Offset);
$this -> s_Buffer = substr ($this -> s_Buffer, $i_Offset + strlen ($this -> s_lineEnd) + 1);
$this -> i_Size -= ($i_Offset + strlen ($this -> s_lineEnd));
return $s_theLine;
}
// Returns as many lines from the buffer as possible
// as an array
public function getLines ()
{
if (!$this -> hasLine ())
{
return false;
}
$i_Offset = strripos ($this -> s_Buffer, $this -> s_lineEnd);
$s_Lines = substr ($this -> s_Buffer, 0, $i_Offset);
$this -> s_Buffer = substr ($this -> s_Buffer, $i_Offset + strlen ($this -> s_lineEnd) + 1);
$this -> i_Size -= ($i_Offset + strlen ($this -> s_lineEnd));
return explode ($this -> s_lineEnd, $s_Lines);
}
}
Accept.php
<?php
/**
* Streams library
* Copyright (c) 2012, Remco Pander
*
* This library consists of a set of classes that can be used to create high-performing
* applications using full-duplex connections. It uses the built-in stream functions
* provided with php.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* @package Streams
* @author Remco Pander <[email protected]>
* @version v1 initial release
* @copyright Copyright (c) 2012, Remco Pander
*/
namespace Streams;
use \LogicException;
use \InvalidArgumentException;
class Accept extends Connection
{
public function __construct ($r_Socket, $s_Address, $b_Secure)
{
// Separate port from address
$s_rAddress = substr ($s_Address, 0, strrpos ($s_Address, ':'));
$i_rAddress = substr ($s_Address, strpos ($s_Address, ':'));
// Initiate connection object
parent :: __construct ($s_rAddress, $i_rAddress, null, $b_Secure);
// Change properties.
$this -> r_Socket = $r_Socket;
$this -> i_Status = Status :: CONNECTED;
// Add socket to poller
Poller :: getInstance () -> addConnection ($r_Socket, $this);
}
}
Status.php
<?php
/**
* Streams library
* Copyright (c) 2012, Remco Pander
*
* This library consists of a set of classes that can be used to create high-performing
* applications using full-duplex connections. It uses the built-in stream functions
* provided with php.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* @package Streams
* @author Remco Pander <[email protected]>
* @version v1 initial release
* @copyright Copyright (c) 2012, Remco Pander
*/
namespace Streams;
abstract class Status
{
// Constants that define connection statuses
const ERROR = -3;
const DISCONNECTED_REMOTE = -2;
const DISCONNECTED_LOCAL = -1;
const INIT = 0;
const CONNECTING = 1;
const CONNECTED = 2;
const LISTENING = 3;
}
Error.php
<?php
/**
* Streams library
* Copyright (c) 2012, Remco Pander
*
* This library consists of a set of classes that can be used to create high-performing
* applications using full-duplex connections. It uses the built-in stream functions
* provided with php.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* @package Streams
* @author Remco Pander <[email protected]>
* @version v1 initial release
* @copyright Copyright (c) 2012, Remco Pander
*/
namespace Streams;
abstract class Error
{
const CONNFAILED = 0;
const INITFAILED = 1;
const WRITEERR = 2;
const READERR = 3;
const ACCEPTERR = 4;
}
Reacties
0