Scripts

Streams lib

Kleine library om non-blocking streams te maken over tcp. Handig als je meerdere verbindingen wil maken of niet wil wachten opdat een verbinding klaar is met connecten/lezen/schrijven. Losjes gebaseerd op Node.js wat betreft het callbacks idee. Voorbeelden qua gebruik kun je vinden op: https://github.com/Pannekoek/Streams/wiki/_pages

Connection.php
<?php
/**
* Streams library
* Copyright (c) 2012, Remco Pander
*
* This library consists of a set of classes that can be used to create high-performing
* applications using full-duplex connections. It uses the built-in stream functions
* provided with php.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* 
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
* 
* You should have received a copy of the GNU General Public License
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
*
* @package    Streams
* @author     Remco Pander <[email protected]>
* @version    v1 initial release
* @copyright  Copyright (c) 2012, Remco Pander
*/

namespace Streams;
use \LogicException;
use \InvalidArgumentException;

class Connection
{
		// Global constants
		const CERT_FILE_PATH     = './';
		const CONN_MAX_TIME      = 5;
		
		// Connection details
		protected $r_Socket      = null;
		protected $i_Status      = 0;
		private $s_rAddress      = null;
		private $s_rPort         = null;
		private $b_Secure        = false;
		private $s_bindTo        = null;
		private $i_timeConnected = 0;
		
		// Buffers for data
		private  $readBuffer     = null;
		private  $writeBuffer    = null;
		
		// Callbacks
		public  $onConnect       = null;
		public  $onTerminate     = null;
		public  $onRead          = null;
		public  $onWrite         = null;
		public  $onError         = null;
		
		// Create a new connection
		public function __construct ($s_Address, $i_Port, $s_bindAddress = null, $b_Secure = false)
		{
				// Resolve address
				if (!@gethostbyname ($s_Address) && !@gethostbyaddr ($s_Address) || $i_Port > 65535 || $i_Port < 2)
				{
						throw new InvalidArgumentException ('Address "' . $s_Address . ':' . $i_Port . '" could not be resolved or is invalid.');
				}
				
				$this -> s_rAddress = $s_Address;
				$this -> i_rPort    = $i_Port;
				$this -> s_bindTo   = $s_bindAddress;
				$this -> b_Secure   = $b_Secure;
				
				if ($this -> b_Secure)
				{
						if (!extension_loaded ('openssl'))
						{
								throw new LogicException ('Cannot create secure connection withouth the OpenSSL extension loaded.');
						}
						
						if (!file_exists (self :: CERT_FILE_PATH))
						{
								throw new LogicException ('Cannot create secure connection withouth a certificate file.');
						}
				}
				
				// Create buffers
				$this -> readBuffer  = new StringBuffer ();
				$this -> writeBuffer = new StringBuffer ();
		}
		
		// Destructor, closes the connection if it is still open
		public function __destruct ()
		{
				// Check if we are connected.
				if ($this -> isConnected ())
				{
						$this -> disconnect ();
				} else
				{
						$this -> _forceClose ();
				}
				
				// Empty buffer
				$this -> readBuffer  = null;
				$this -> writeBuffer = null;
		}
		
		public function __get ($s_Member)
		{
				switch ($s_Member)
				{
						case 'readBuffer':
								return $this -> readBuffer;
								
						case 'writeBuffer':
								return $this -> writeBuffer;
								
						default:
								throw new LogicException ('Acces to "' . $s_Member . '" is not allowed.');
				}
		}
		
		// Returns address
		public function __toString ()
		{
				return $this -> s_rAddress . ':' . $this -> i_rPort;
		}
		
		// Returns the time we connected
		public function getTimeConnected ()
		{
				return $this -> i_timeConnected;
		}
		
		// Returns the current status
		public function getStatus ()
		{
				return $this -> i_Status;
		}
				
		// Returns whether the socket is connected or not
		public function isConnected ()
		{
				return ($this -> i_Status == Status :: CONNECTED);
		}
		
		// Returns whether this socket is using a secure connection
		public function isSecure ()
		{
				return $this -> b_Secure;
		}
		
		// Checks if the socket has data pending to write
		public function hasWriteBuffer ()
		{
				return $this -> writeBuffer -> hasBuffer ();
		}
		
		// Checks if the socket has data pending in its read buffer
		public function hasReadBuffer ()
		{
				return $this -> readBuffer -> hasBuffer ();
		}
				
		// Queries the remote machine to get it's address
		public function getRemoteAddress ()
		{
				if (!$this -> isConnected ())
				{
						throw new LogicException ('Cannot receive remote address on socket that is not connected.');
				}
				
				return stream_socket_get_name ($this -> r_Socket, true);
		}
		
		// Queries the local machine to get it's address
		public function getLocalAddress ()
		{
				if (!$this -> isConnected ())
				{
						throw new LogicException ('Cannot receive local address on socket that is not connected.');
				}
				
				return stream_socket_get_name ($this -> r_Socket, false);
		}
		
		// Sets the encoding for the stream
		public function setEncoding ($s_Encoding)
		{
				if (!function_exists ('mb_list_encodings')  || !in_array ($s_Encoding, mb_list_encodings ()))
				{
						return false;
				}
				
				return stream_encoding ($this -> r_Socket, $s_Encoding);
		}
		
		// Sets the internal buffer's line endings to something else
		public function setLineEnding ($s_lineEnd)
		{
				$this -> readBuffer  -> setLineEnding ($s_lineEnd);
				$this -> writeBuffer -> setLineEnding ($s_lineEnd);
				
				return true;
		}
				
		// Connects to the remote machine
		public function connect ()
		{
				if ($this -> isConnected ())
				{
						return true;
				}
				
				if ($this -> i_Status == Status :: CONNECTING)
				{
						if (!feof ($this -> r_Socket))
						{
								$this -> i_Status        = Status :: CONNECTED;
								$this -> i_timeConnected = time ();
								$this -> _doCallback ('onConnect', array ($this, $this -> i_timeConnected));
								
								return true;
						} else /** call from poller that connection failed **/
						{
								$this -> i_Status = Status :: ERROR;
								$this -> _doCallback ('onError', array ($this, Error :: CONNFAILED));
								
								return false;
						}
				}
				
				// Prepare context
				$a_Opts = array ();
				$s_Scheme = 'tcp://';
				
				// Set options for ssl connection
				if ($this -> b_Secure)
				{
						$s_Scheme       = 'ssl://';
						$a_Opts ['ssl'] = array ('verify_peer'       => false,
												 'allow_self_signed' => true,
												 'cert_file'         => self :: CERT_FILE_PATH,
												 'passphrase'        => ''
												);
				}
				
				// Set bind address
				if (!empty ($this -> s_bindTo))
				{
						$a_Opts ['socket'] = array ('bindto' => $this -> s_bindTo);
				}

				// Now, create the actual resource.
				$i_Flags          = STREAM_CLIENT_CONNECT | STREAM_CLIENT_ASYNC_CONNECT;
				$r_Context        = stream_context_create ($a_Opts);
				$this -> r_Socket = stream_socket_client  ($s_Scheme . $this -> s_rAddress . ':' . $this -> i_rPort, $i_Errno, $s_Errstr, null, $i_Flags, $r_Context);
				
				// Check if we succeeded
				if (!is_resource ($this -> r_Socket))
				{
						$this -> i_Status = Status :: ERROR;
						$this -> _doCallback ('onError', array ($this, Error :: INITFAILED));
						return false;
				}
				
				// Change blocking status.
				stream_set_blocking ($this -> r_Socket, false);
				$this -> i_Status = Status :: CONNECTING;
				
				// Insert to our poller object
				Poller :: getInstance () -> addConnection ($this -> r_Socket, $this);
				
				return false;
		}
		
		// Terminates the connection to the remote machine
		public function disconnect ()
		{
				if (!$this -> isConnected ())
				{
						return true;
				}
				
				// Check if there is data pending..
				if ($this -> writeBuffer -> hasBuffer ())
				{
						// Bypass write () because we do not want to send any callbacks anymore
						// and do not care for errors
						if (($i_Written = @fwrite ($this -> r_Socket, $this -> writeBuffer -> getBuffer ())) !== false)
						{
								$this -> writeBuffer -> removeLength ($i_Written);
						}		
				}
				
				// Close it.
				$this -> _forceClose ();
				$this -> i_Status = Status :: DISCONNECTED_LOCAL;
				$this -> _doCallback ('onTerminate', array ($this, Status :: DISCONNECTED_LOCAL));
				
				return true;
		}
		
		// Attempts to write data from the write buffer to the remote machine
		// Returns amount of bytes written or false
		public function write ()
		{
				if (!$this -> isConnected ())
				{
						throw new LogicException ('Cannot perform write on socket that is not connected.');
				}
				
				if (($i_Written = fwrite ($this -> r_Socket, $this -> writeBuffer -> getBuffer ())) === false)
				{
						$this -> i_Status = Status :: ERROR;
						$this -> _forceClose ();
						$this -> _doCallback ('onError', array ($this, Error :: WRITEERR));
						
						return false;						
				}
				
				$this -> writeBuffer -> removeLength ($i_Written);
				$this -> _doCallback ('onWrite', array ($this, $this -> writeBuffer, $i_Written));
				
				return true;
		}
		
		// Read as much data from the stream as possible
		// Returns amount of bytes written or false
		public function read ()
		{
				if (!$this -> isConnected ())
				{
						throw new LogicException ('Cannot perform read on socket that is not connected.');
				}
				
				// Check for end-of-file
				if (feof ($this -> r_Socket))
				{
						$this -> _forceClose ();
						$this -> i_Status = Status :: DISCONNECTED_REMOTE;
						$this -> _doCallback ('onTerminate', array ($this, Status :: DISCONNECTED_REMOTE));
						
						return false;
				}
				
				// Perform read action..
				if (($s_readString = fread ($this -> r_Socket, 8192)) == false)
				{
						$this -> i_Status = Status :: ERROR;
						$this -> _forceClose ();
						$this -> _doCallback ('onError', array ($this, Error :: READERR));
						
						return false;
				}
				
				// Append to read buffer
				$this -> readBuffer -> appendBuffer ($s_readString);
				$this -> _doCallback ('onRead', array ($this, $this -> readBuffer, strlen ($s_readString)));
				
				return true;
		}
		
		// Private function to close the resource
		private function _forceClose ()
		{
				if (!is_resource ($this -> r_Socket))
				{
						return;
				}
								
				// Close the socket resource
				stream_socket_shutdown ($this -> r_Socket, STREAM_SHUT_RDWR);
				fclose                 ($this -> r_Socket);				
				$this -> r_Socket        = null;
				$this -> i_timeConnected = 0;
				
				// Remove from poller
				Poller :: getInstance () -> removeConnection ($this); 
		}
		
		// Attempts to callback to someone
		private function _doCallback ($s_Event, array $a_Args = array ())
		{
				if (isset ($this -> $s_Event) && is_callable ($this -> $s_Event))
				{
						call_user_func_array ($this -> $s_Event, $a_Args);
				}
				
				return;
		}
}
Listening.php
<?php
/**
* Streams library
* Copyright (c) 2012, Remco Pander
*
* This library consists of a set of classes that can be used to create high-performing
* applications using full-duplex connections. It uses the built-in stream functions
* provided with php.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* 
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
* 
* You should have received a copy of the GNU General Public License
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
*
* @package    Streams
* @author     Remco Pander <[email protected]>
* @version    v1 initial release
* @copyright  Copyright (c) 2012, Remco Pander
*/

namespace Streams;
use \LogicException;
use \InvalidArgumentException;

class Listening
{		
		// Global constants
		const CERT_FILE_PATH   ='./';
		
		// Stream details
		private $r_Socket      = null;
		private $i_Status      = 0;
		private $s_lAddress    = null;
		private $i_lPort       = 0;
		private $i_listenStart = 0;
		private $i_Accepted    = 0;
		private $b_Secure      = false;
		
		// Callbacks
		public  $onAccept      = null;
		public  $onTerminate   = null;
		public  $onError       = null;
		
		public function __construct ($i_Port = 0, $s_Address = null, $b_Secure = false)
		{
				if ($b_Secure)
				{
						if (!extension_loaded ('openssl'))
						{
								throw new LogicException ('Cannot create secure connection withouth the OpenSSL extension loaded.');
						}
						
						if (!file_exists (self :: CERT_FILE_PATH))
						{
								throw new LogicException ('Cannot create secure connection withouth a certificate file.');
						}
				}
				
				$this -> i_lPort    = $i_Port;
				$this -> s_lAddress = null;
				$this -> b_Secure   = $b_Secure;
		}
		
		public function isListening ()
		{
				return ($this -> i_Status == Status :: LISTENING);
		}
		
		public function getStatus ()
		{
				return $this -> i_Status;
		}
		
		public function getAcceptedCount ()
		{
				return $this -> i_Accepted;
		}
		
		public function getListenStarted ()
		{
				return $this -> i_listenStart;
		}
		
		public function getListenPort ()
		{
				return $this -> i_lPort;
		}
		
		public function getLocalAddress ()
		{
				if (!$this -> isListening ())
				{
						throw new LogicException ('Cannot retrieve local address on stream that is not listening.');
				}
				
				return stream_socket_get_name ($this -> r_Socket, false);
		}
		
		// Start listening on the given port and address
		public function listen ()
		{
				if ($this -> isListening ())
				{
						return true;
				}
				
				$s_Scheme = 'tcp://';
				$a_Opts   = array ();
				
				// Set options for ssl connection
				if ($this -> b_Secure)
				{
						$s_Scheme       = 'ssl://';
						$a_Opts ['ssl'] = array ('verify_peer'       => false,
												 'allow_self_signed' => true,
												 'cert_file'         => self :: CERT_FILE_PATH,
												 'passphrase'        => ''
												);
				}
				
				$i_Flags          = STREAM_SERVER_BIND | STREAM_SERVER_LISTEN;
				$r_Context        = stream_context_create ($a_Opts);
				$this -> r_Socket = stream_socket_server  ($s_Scheme . $this -> s_lAddress . ':' . $this -> i_lPort, $i_Errno, $s_Errstr, $i_Flags, $r_Context);
				
				// Check if we succeeded
				if (!is_resource ($this -> r_Socket))
				{
						$this -> i_Status = Status :: ERROR;
						$this -> _doCallback ('onError', array ($this, Error :: INITFAILED));
						return false;
				}
				
				// Change blocking status
				stream_set_blocking ($this -> r_Socket, false);
				$this -> i_Status      = Status :: LISTENING;
				$this -> i_listenStart = time ();
				
				// Fetch the port on which we are listening now..
				if ($this -> i_lPort == 0)
				{
						$this -> i_lPort = substr ($this -> getLocalAddress (), strrpos ($this -> getLocalAddress (), ':'));
				}
				
				// Add to poller
				Poller :: getInstance () -> addConnection ($this -> r_Socket, $this);
				
				return true;
		}
		
		// Stop listening
		public function terminate ()
		{
				if (!$this -> isListening ())
				{
						return true;
				}
				
				// Close stream
				$this -> _forceClose ();
				$this -> i_Status = Status :: DISCONNECTED_LOCAL;
				$this -> i_listenStart = 0;
				$this -> _doCallback ('onTerminate', array ($this));
				
				return true;
		}
		
		// Accept an incoming connection and pass it on to the callback function
		public function accept ()
		{
				if (!$this -> isListening ())
				{
						throw new RuntimeException ('Cannot accept on a stream that is not listening.');
				}
				
				// Fetch the resource
				if (($r_Socket = stream_socket_accept ($this -> r_Socket, 0, $s_Address)) == false)
				{
						$this -> _doCallback ('onError', array ($this, Error :: ACCEPTERR));
						return false;
				}
				
				$o_Socket = new Accept ($r_Socket, $s_Address, $this -> b_Secure);
				$this -> i_Accepted++;
				
				// Callback,
				// if it has none, disconnect the accepted connection and remove it.
				if (isset ($this -> $s_Event) && is_callable ($this -> $s_Event))
				{
						Poller :: getInstance () -> addConnection ($r_Socket, $o_Socket);
						$this -> _doCallback ('onAccept', array ($this, $o_Socket));
				} else
				{
						$o_Socket -> disconnect ();
						unset ($o_Socket);
				}
				
				return true;
		}
		
		// Private function to close the resource
		private function _forceClose ()
		{
				if (!is_resource ($this -> r_Socket))
				{
						return;
				}
								
				// Close the socket resource
				stream_socket_shutdown ($this -> r_Socket, STREAM_SHUT_RDWR);
				fclose                 ($this -> r_Socket);				
				$this -> r_Socket        = null;
				$this -> i_listenStart   = 0;
				
				// Remove from poller
				Poller :: getInstance () -> removeConnection ($this); 
		}
		
		// Attempts to callback to someone
		private function _doCallback ($s_Event, array $a_Args = array ())
		{
				if (isset ($this -> $s_Event) && is_callable ($this -> $s_Event))
				{
						call_user_func_array ($this -> $s_Event, $a_Args);
				}
				
				return;
		}
}
Poller.php
<?php
/**
* Streams library
* Copyright (c) 2012, Remco Pander
*
* This library consists of a set of classes that can be used to create high-performing
* applications using full-duplex connections. It uses the built-in stream functions
* provided with php.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* 
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
* 
* You should have received a copy of the GNU General Public License
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
*
* @package    Streams
* @author     Remco Pander <[email protected]>
* @version    v1 initial release
* @copyright  Copyright (c) 2012, Remco Pander
*/

namespace Streams;
use \InvalidArgumentException;
use \RuntimeException;

final class Poller
{
		// Instance of the poller
		private static $_instance = null;
		
		// List of all connections to monitor
		private $a_connList  = array ();
		private $a_connSocks = array ();
		
		// Returns instance of this class
		public static function getInstance ()
		{
				if (empty (self ::  $_instance))
				{
						self ::  $_instance = new Poller ();
				}
				
				return self ::  $_instance;
		}
		
		// Allow no creation of this class
		private function __construct ()
		{
		}		
		
		public function hasConnection (Connection $o_Conn)
		{
				return in_array ($o_Conn, $this -> a_connList);
		}
		
		public function getCount ()
		{
				return count ($this -> a_connList);
		}
		
		public function addConnection ($r_Socket, $o_Conn)
		{
				if (!$o_Conn instanceof Connection && !$o_Conn instanceof Listening)
				{
						throw new InvalidArgumentException ('Poller :: addConnection () expects parameter 2 to be an instance of Connection or Listening.');
				} else if (!is_resource ($r_Socket))
				{
						throw new InvalidArgumentException ('Poller :: addConnection () expects parameter 1 to be a valid resource.');
				} else if (in_array ($o_Conn, $this -> a_connList))
				{
						return true;
				}
				
				$i_Intval                        = intval ($r_Socket);
				$this -> a_connList [$i_Intval]  = $o_Conn;
				$this -> a_connSocks [$i_Intval] = $r_Socket;
				
				return true;
		}
		
		public function removeConnection ($o_Conn)
		{
				if (!$o_Conn instanceof Connection && !$o_Conn instanceof Listening)
				{
						throw new InvalidArgumentException ('Poller :: addConnection () expects parameter 2 to be an instance of Connection or Listening.');
				} 
				
				if (!in_array ($o_Conn, $this -> a_connList))
				{
						return true;
				}
				
				$i_Intval = array_search ($o_Conn, $this -> a_connList, true);
				unset ($this -> a_connList [$i_Intval]);
				unset ($this -> a_connSocks [$i_Intval]);
				
				return true;
		}
		
		public function pollConnections ($f_blockTime = 0)
		{
				if ($this -> getCount () < 1)
				{
						return false;
				}
				
				// Calculate time-out
				if (!is_float ($f_blockTime) && !is_int ($f_blockTime))
				{
						$i_Seconds  = null;
						$i_mSeconds = null;
				} else
				{
						$i_Seconds  = floor ($f_blockTime);
						$i_mSeconds = (($f_blockTime - $i_Seconds) * 1e6);
				}
				
				// Create fd sets
				$a_readSock = $a_writeSocks = $a_exceptSocks = array ();
				
				foreach ($this -> a_connList as $i_Intval => $o_Connection)
				{
						$r_Socket = $this -> a_connSocks [$i_Intval];
						
						switch ($o_Connection -> getStatus ())
						{
								case Status :: CONNECTING:
										$a_writeSocks [] = $r_Socket;
										break;
								
								case Status :: CONNECTED:
										$a_readSocks   [] = $r_Socket;
										$a_exceptSocks [] = $r_Socket;
										
										if ($o_Connection -> hasWriteBuffer ())
										{
												$a_writeSocks [] = $r_Socket;
										}
										
										break;
										
								case Status :: LISTENING:
										$a_readSocks [] = $r_Socket;
										break;
						}
				}
				
				// Here comes the magic part
				if (($i_Changed = stream_select ($a_readSocks, $a_writeSocks, $a_exceptSocks, $i_Seconds, $i_mSeconds)) === false)
				{
						throw new RuntimeException ('Fatal error occurred while performing stream_select ().');
				} else if ($i_Changed < 1)
				{
						return false;
				}
				
				/**
				* Readable sockets are sockets that:
				* 1. listening for connections with an incoming connection
				* 2. data is avaliable to be read
				* 3. end-of-file has been reached
				*/
				if (count ($a_readSocks) > 0)
				{
						foreach ($a_readSocks as $r_Socket)
						{
								$o_Connection = $this -> a_connList [intval ($r_Socket)];
								
								switch ($o_Connection -> getStatus ())
								{
										case Status :: LISTENING:
												$o_Connection -> accept ();
												break;
												
										case Status :: CONNECTED:
												$o_Connection -> read ();
												break;
								}
						}
				}
				
				/**
				* Writable sockets are sockets that:
				* 1. sockets that were connecting and now are connected
				* 2. data can be written
				*/
				if (count ($a_writeSocks) > 0)
				{
						foreach ($a_writeSocks as $r_Socket)
						{
								$o_Connection = $this -> a_connList [intval ($r_Socket)];
								
								switch ($o_Connection -> getStatus ())
								{
										case Status :: CONNECTING:
												$o_Connection -> connect ();
												break;
												
										case Status :: CONNECTED:
												$o_Connection -> write ();
												break;
								}
						}
				}
				
				/**
				* Except sockets are sockets that:
				* 1. sockets that were connecting and it failed
				* 2. Out-Of-Band data can be read
				*/
				if (count ($a_exceptSocks) > 0)
				{
						foreach ($a_exceptSocks as $r_Socket)
						{
								$o_Connection = $this -> a_connList [intval ($r_Socket)];
								
								switch ($o_Connection -> getStatus ())
								{
										case Status :: CONNECTING:
												$o_Connection -> connect ();
												break;
												
										case Status :: CONNECTED:
												// @todo
												break;
								}
						}
				}		

				return true;
		}
}
HttpRequest.php
<?php
/**
* Streams library
* Copyright (c) 2012, Remco Pander
*
* This library consists of a set of classes that can be used to create high-performing
* applications using full-duplex connections. It uses the built-in stream functions
* provided with php.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* 
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
* 
* You should have received a copy of the GNU General Public License
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
*
* @package    Streams
* @author     Remco Pander <[email protected]>
* @version    v1 initial release
* @copyright  Copyright (c) 2012, Remco Pander
*/

namespace Streams;
use \LogicException;
use \InvalidArgumentException;

class HttpRequest
{
		private $o_Connection = null;
		private $a_Headers    = null;
		private $s_Body       = null;
		private $s_Site       = null;
		private $s_Page       = null;
		
		public  $onSuccess    = null;
		public  $onFailure    = null;
		
		public static function doGet ($s_Site, $s_Page = '')
		{
				$conn = new HttpRequest ($s_Site, $s_Page);
				
				// Fill headers..
				$conn -> a_Headers [] = "GET /" . $s_Page . " HTTP/1.1\r\n";
				$conn -> a_Headers [] = "Host: " . $s_Site . "\r\n";
				$conn -> a_Headers [] = "Connection: close\r\n";
				$conn -> a_Headers [] = "\r\n";
				
				// Return object
				return $conn;
		}
		
		public static function doPost ($s_Site, $s_Page = '/', array $a_Fields = array ())
		{
				if (count ($a_Fields) < 1)
				{
						return self :: doGet ($s_Site, $s_Page);
				}
				
				$conn     = new HttpRequest ($s_Site, $s_Page);
				$s_Fields = http_build_query ($a_Fields);
				
				// Fill headers
				$conn -> a_Headers [] = "POST /" . $s_Page . " HTTP/1.1";
				$conn -> a_Headers [] = "Host: " . $s_Site;
				$conn -> a_Headers [] = "Connection: close";
				$conn -> a_Headers [] = "Content-Type: application/x-www-form-urlencoded";
				$conn -> a_Headers [] = "Content-Length: " . strlen ($s_Fields);
				$conn -> s_Body       = $s_Fields;
				
				// Return object
				return $conn;
		}
		
		public function __construct ($s_Site, $s_Page)
		{
				$this -> s_Site = $s_Site;
				$this -> s_Page = $s_Page;
				
				// Create connection and set callbacks.
				$this -> o_Connection                = new Connection ($s_Site, 80);
				$this -> o_Connection -> onConnect   = array ($this, 'onConnect');
				$this -> o_Connection -> onTerminate = array ($this, 'onTerminate');
				$this -> o_Connection -> onError     = array ($this, 'onError');
		}
		
		public function __destruct ()
		{
				$this -> o_Connection = null;
		}
		
		public function execute ()
		{
				return $this -> o_Connection -> connect ();
		}
		
		public function onConnect ($o_Connection, $i_connTime)
		{
				// Build headers
				$s_Headers   = implode ("\r\n", $this -> a_Headers);
				$s_Headers .= "\r\n";
				
				if (!empty ($this -> s_Body))
				{
						$s_Headers .= $this -> s_Body;
				}
				
				// Send headers
				$o_Connection -> writeBuffer -> appendBuffer ($s_Headers);
				return true;
		}
		
		public function onError ($o_Connection, $i_Error)
		{
				if (isset ($this -> onFailure) && is_callable ($this -> onFailure))
				{
						call_user_func_array ($this -> onFailure, array ($this, $i_Error));
				}
		}
		
		public function onTerminate ($o_Connection, $i_whoClosed)
		{
				// Request finished, filter headers from response.
				$s_httpResponse = $o_Connection -> readBuffer -> getBuffer ();
				$a_httpResponse = explode ("\r\n\r\n", $s_httpResponse);			
				$a_httpResponse  = array ('headers' => $a_httpResponse [0],
				                          'body'    => $a_httpResponse [1]);
										 
				if (isset ($this -> onSuccess) && is_callable ($this -> onSuccess))
				{
						call_user_func_array ($this -> onSuccess, array ($this, $a_httpResponse));
				}
				
				return true;
		}
}
StringBuffer.php
<?php
/**
* Streams library
* Copyright (c) 2012, Remco Pander
*
* This library consists of a set of classes that can be used to create high-performing
* applications using full-duplex connections. It uses the built-in stream functions
* provided with php.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* 
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
* 
* You should have received a copy of the GNU General Public License
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
*
* @package    Streams
* @author     Remco Pander <[email protected]>
* @version    v1 initial release
* @copyright  Copyright (c) 2012, Remco Pander
*/

namespace Streams;

class StringBuffer
{
		// Buffer
		private $s_Buffer = null;
		private $i_Length = 0;
		private $s_lineEnd = "\n";
		
		public function setLineEnding ($s_newEnding)
		{
				$this -> s_lineEnd = (string) $s_newEnding;
		}
		
		// (string) StringBuffer
		public function __toString ()
		{
				return $this -> s_Buffer;
		}
		
		public function toString ()
		{
				return $this -> s_Buffer;
		}
		
		public function toArray ()
		{
				return $this -> getLines ();
		}
		
		// Adds one or more characters to the buffer
		public function appendBuffer ($s_String)
		{
				$this -> s_Buffer .= (string) $s_String;
				$this -> i_Length += strlen  ($s_String);
				
				return true;
		}
		
		public function prependBuffer ($s_String)
		{
				$this -> s_Buffer  = $s_String .  $this -> s_Buffer;
				$this -> i_Length += strlen ($s_String);
				
				return true;
		}

		public function hasBuffer ()
		{
				return ($this -> i_Length > 0);
		}
		
		public function hasLine ()
		{
				return stripos ($this -> s_Buffer, $this -> s_lineEnd);
		}
		
		public function getSize ()
		{
				return $this -> i_Length;
		}
				
		// Removes x amount of bytes from the beginning or end of the buffer
		public function removeLength ($i_Length, $b_Prepend = true)
		{
				if ($i_Length < 0)
				{
						throw new InvalidArgumentException ('StringBuffer :: removeLength () expects parameter 1 to be positive.');
				} else if ($i_Length > $this -> i_Length)
				{
						$this -> s_Buffer = null;
						$this -> i_Length = 0;
						
						return true;
				}
				
				// Remove from beginning or end?
				if ($b_Prepend)
				{
						$this -> s_Buffer  = substr ($this -> s_Buffer, $i_Length);
						$this -> i_Length -= $i_Length;
				} else
				{
						$this -> s_Buffer  = substr ($this -> s_Buffer, 0, $i_Length);
						$this -> i_Length -= $i_Length;
				}
				
				return true;
		}
		
		// Returns all of the buffer.
		public function getBuffer ()
		{
				return $this -> s_Buffer;
		}
		
		// Clears buffer
		public function flushBuffer ()
		{
				$this -> s_Buffer = null;
				$this -> i_Lenght = 0;
				
				return true;
		}
		
		// Returns one line from the buffer
		// or false if it has none.
		public function getLine ()
		{
				if (!$this -> hasLine ())
				{
						return false;
				}
				
				$i_Offset         = stripos ($this -> s_Buffer, $this -> s_lineEnd);
				$s_theLine        = substr ($this -> s_Buffer, 0, $i_Offset);
				$this -> s_Buffer = substr ($this -> s_Buffer, $i_Offset + strlen ($this -> s_lineEnd) + 1);
				$this -> i_Size  -= ($i_Offset + strlen ($this -> s_lineEnd));
				
				return $s_theLine;
		}
		
		// Returns as many lines from the buffer as possible
		// as an array
		public function getLines ()
		{
				if (!$this -> hasLine ())
				{
						return false;
				}
				
				$i_Offset         = strripos ($this -> s_Buffer, $this -> s_lineEnd);
				$s_Lines          = substr ($this -> s_Buffer, 0, $i_Offset);
				$this -> s_Buffer = substr ($this -> s_Buffer, $i_Offset + strlen ($this -> s_lineEnd) + 1);
				$this -> i_Size  -= ($i_Offset + strlen ($this -> s_lineEnd));
				
				return explode ($this -> s_lineEnd, $s_Lines);
		}
}
Accept.php
<?php
/**
* Streams library
* Copyright (c) 2012, Remco Pander
*
* This library consists of a set of classes that can be used to create high-performing
* applications using full-duplex connections. It uses the built-in stream functions
* provided with php.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* 
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
* 
* You should have received a copy of the GNU General Public License
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
*
* @package    Streams
* @author     Remco Pander <[email protected]>
* @version    v1 initial release
* @copyright  Copyright (c) 2012, Remco Pander
*/

namespace Streams;
use \LogicException;
use \InvalidArgumentException;

class Accept extends Connection
{
		public function __construct ($r_Socket, $s_Address, $b_Secure)
		{
				// Separate port from address
				$s_rAddress = substr ($s_Address, 0, strrpos ($s_Address, ':'));
				$i_rAddress = substr ($s_Address, strpos ($s_Address, ':'));
				
				// Initiate connection object
				parent :: __construct ($s_rAddress, $i_rAddress, null, $b_Secure);
				
				// Change properties.
				$this -> r_Socket = $r_Socket;
				$this -> i_Status = Status :: CONNECTED;
				
				// Add socket to poller
				Poller :: getInstance () -> addConnection ($r_Socket, $this);
		}
}
Status.php
<?php
/**
* Streams library
* Copyright (c) 2012, Remco Pander
*
* This library consists of a set of classes that can be used to create high-performing
* applications using full-duplex connections. It uses the built-in stream functions
* provided with php.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* 
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
* 
* You should have received a copy of the GNU General Public License
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
*
* @package    Streams
* @author     Remco Pander <[email protected]>
* @version    v1 initial release
* @copyright  Copyright (c) 2012, Remco Pander
*/

namespace Streams;
abstract class Status
{		
		// Constants that define connection statuses
		const ERROR               = -3;
		const DISCONNECTED_REMOTE = -2;
		const DISCONNECTED_LOCAL  = -1;
		const INIT                = 0;
		const CONNECTING          = 1;
		const CONNECTED           = 2;
		const LISTENING           = 3;
}
Error.php
<?php
/**
* Streams library
* Copyright (c) 2012, Remco Pander
*
* This library consists of a set of classes that can be used to create high-performing
* applications using full-duplex connections. It uses the built-in stream functions
* provided with php.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
* 
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
* 
* You should have received a copy of the GNU General Public License
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
*
* @package    Streams
* @author     Remco Pander <[email protected]>
* @version    v1 initial release
* @copyright  Copyright (c) 2012, Remco Pander
*/

namespace Streams;
abstract class Error
{		
		const CONNFAILED = 0;
		const INITFAILED = 1;
		const WRITEERR   = 2;
		const READERR    = 3;
		const ACCEPTERR  = 4;
}

Reacties

0
Nog geen reacties.