Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • Saransaran/php-class-project
  • sibidharan/php-class-project
  • Madhan1024/php-class-project
  • GopiKrishnan/photogram
  • Mhd_khalid/php-class-project
  • At_muthu__/php-class-project
  • jaganbhaskar155/php-class-project
  • hariharanrd/php-class-project
  • jasper715/php-class-project
  • hanuRakesh/photogram-project-main
  • Yuvaraj21/photogram
  • ram_rogers/php-class-project
  • Hihelloboy/php-class-project
  • Nadarajan/php-class-project
  • srisanthosh156/php-class-project
  • Buvaneshwaran.k/php-class-project
  • umarfarooq07/php-class-project
  • Dhanaprakash/php-class-project
  • jashwanth142003/php-class-project
  • Esakkiraja/php-class-project
  • Boomi/php-class-project
  • Kishore2071/php-class-project
  • Ram123raj/php-class-project
  • aswinkumar27/php-class-project
  • dhilipdhilip9655/php-class-project
  • Manikandam143/php-class-project
  • VikramS/php-class-project
  • ArnoldSam/php-class-project
  • gowthamapandi0008/php-class-project
  • d.barath7639/php-class-project
  • shyalandran/php-class-project
  • kiruba_432/php-class-project
  • razakias001/php-class-project
  • kannan.b2745/php-class-project
  • sathish236tsk/php-class-project
  • rii/php-class-project
  • jonathajh4k/php-class-project
  • Neelagandan_G/php-class-project
  • Tholkappiar2003/php-class-project
  • kamaleshselvam75/php-class-project
  • devapriyan/php-class-project
  • sanojahamed/php-class-project
  • rizwankendo/php-class-project
  • senthamilselvan18000/php-class-project
  • rajeshd01/php-class-project
  • Florence/php-class-project
  • vishnu191299/php-class-project
  • Rakeshrakki/php-class-project
  • sanjay057/php-class-project
  • amarsanthoshsanthosh/photogram-project-cp
  • md_ashmar/php-class-project
  • k.nandhishwaran777k/php-class-project
52 results
Show changes
Showing
with 3201 additions and 0 deletions
<?php
namespace PhpAmqpLib\Wire;
class AMQPArray extends AMQPAbstractCollection
{
/**
* @param array|null $data
*/
public function __construct(array $data = null)
{
parent::__construct(empty($data) ? null : array_values($data));
}
/**
* @return int
*/
final public function getType()
{
return self::T_ARRAY;
}
/**
* @param mixed $val
* @param int|null $type
* @return $this
*/
public function push($val, $type = null)
{
$this->setValue($val, $type);
return $this;
}
}
<?php
namespace PhpAmqpLib\Wire;
use PhpAmqpLib\Exception\AMQPDataReadException;
class AMQPBufferReader extends AMQPReader
{
/**
* @var string
*/
private $buffer;
/**
* @var int
*/
private $length;
public function __construct(string $buffer)
{
$this->buffer = $buffer;
$this->length = mb_strlen($buffer, 'ASCII');
}
public function close(): void
{
}
/**
* Resets the object from the injected param
*
* Used to not need to create a new AMQPBufferReader instance every time.
* when we can just pass a string and reset the object state.
* NOTE: since we are working with strings we don't need to pass an AbstractIO
* or a timeout.
*
* @param string $str
*/
public function reset(string $str): void
{
$this->buffer = $str;
$this->length = mb_strlen($this->buffer, 'ASCII');
$this->offset = 0;
$this->resetCounters();
}
protected function rawread(int $n): string
{
if ($this->length < $n) {
throw new AMQPDataReadException(sprintf(
'Error reading data. Requested %s bytes while string buffer has only %s',
$n,
$this->length
));
}
$res = mb_substr($this->buffer, 0, $n, 'ASCII');
$this->buffer = mb_substr($this->buffer, $n, null, 'ASCII');
$this->length -= $n;
$this->offset += $n;
return $res;
}
}
<?php
namespace PhpAmqpLib\Wire;
use PhpAmqpLib\Helper\BigInteger;
abstract class AMQPByteStream
{
public const BIT = 1;
public const OCTET = 1;
public const SHORTSTR = 1;
public const SHORT = 2;
public const LONG = 4;
public const SIGNED_LONG = 4;
public const READ_PHP_INT = 4; // use READ_ to avoid possible clashes with PHP
public const LONGLONG = 8;
public const TIMESTAMP = 8;
/** @var bool */
protected const PLATFORM_64BIT = PHP_INT_SIZE === 8;
/** @var BigInteger[][] */
protected static $bigIntegers = array();
/**
* @var bool
*/
protected static $isLittleEndian;
/**
* Converts byte-string between native and network byte order, in both directions
*
* @param string $bytes
* @return string
*/
protected function correctEndianness($bytes)
{
return self::isLittleEndian() ? $this->convertByteOrder($bytes) : $bytes;
}
/**
* @param string $bytes
* @return string
*/
protected function convertByteOrder($bytes)
{
return strrev($bytes);
}
/**
* @param int $longInt
* @return bool
*/
protected function getLongMSB($longInt)
{
return (bool) ($longInt & 0x80000000);
}
/**
* @param string $bytes
* @return bool
*/
protected function getMSB($bytes)
{
return ord($bytes[0]) > 127;
}
/**
* @return bool
*/
protected static function isLittleEndian()
{
if (self::$isLittleEndian === null) {
$tmp = unpack('S', "\x01\x00"); // to maintain 5.3 compatibility
self::$isLittleEndian = $tmp[1] === 1;
}
return self::$isLittleEndian;
}
/**
* @param string $value
* @param int $base
* @return BigInteger
*/
protected static function getBigInteger($value, $base = 10)
{
if (!isset(self::$bigIntegers[$base])) {
self::$bigIntegers[$base] = array();
}
if (isset(self::$bigIntegers[$base][$value])) {
return self::$bigIntegers[$base][$value];
}
$integer = new BigInteger($value, $base);
self::$bigIntegers[$base][$value] = $integer;
return $integer;
}
}
<?php
namespace PhpAmqpLib\Wire;
use PhpAmqpLib\Exception\AMQPOutOfBoundsException;
use PhpAmqpLib\Helper\BigInteger;
/**
* AMQP protocol decimal value.
*
* Values are represented as (n,e) pairs. The actual value
* is n * 10^(-e).
*
* From 0.8 spec: Decimal values are
* not intended to support floating point values, but rather
* business values such as currency rates and amounts. The
* 'decimals' octet is not signed.
*/
class AMQPDecimal
{
/** @var int */
protected $n;
/** @var int */
protected $e;
/**
* @param int $n
* @param int $e
* @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
*/
public function __construct($n, $e)
{
if ($e < 0) {
throw new AMQPOutOfBoundsException('Decimal exponent value must be unsigned!');
}
$this->n = $n;
$this->e = $e;
}
/**
* @return string
*/
public function asBCvalue()
{
$n = new BigInteger($this->n);
$e = new BigInteger('1' . str_repeat('0', $this->e));
list($q) = $n->divide($e);
return $q->toString();
}
/**
* @return int
*/
public function getE()
{
return $this->e;
}
/**
* @return int
*/
public function getN()
{
return $this->n;
}
}
<?php
namespace PhpAmqpLib\Wire;
use PhpAmqpLib\Exception\AMQPDataReadException;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Exception\AMQPNoDataException;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Helper\MiscHelper;
use PhpAmqpLib\Wire\IO\AbstractIO;
use RuntimeException;
class AMQPIOReader extends AMQPReader
{
/** @var AbstractIO */
private $io;
/** @var int|float|null */
protected $timeout;
public function __construct(AbstractIO $io, $timeout = 0)
{
$this->io = $io;
$this->timeout = $timeout;
}
public function close(): void
{
$this->io->close();
}
/**
* @return float|int|mixed|null
*/
public function getTimeout()
{
return $this->timeout;
}
/**
* Sets the timeout (second)
*
* @param int|float|null $timeout
*/
public function setTimeout($timeout)
{
$this->timeout = $timeout;
}
/**
* @param int $n
* @return string
* @throws RuntimeException
* @throws AMQPDataReadException|AMQPNoDataException|AMQPIOException
*/
protected function rawread(int $n): string
{
$res = '';
while (true) {
$this->wait();
try {
$res = $this->io->read($n);
break;
} catch (AMQPTimeoutException $e) {
if ($this->getTimeout() > 0) {
throw $e;
}
}
}
$this->offset += $n;
return $res;
}
/**
* Waits until some data is retrieved from the socket.
*
* AMQPTimeoutException can be raised if the timeout is set
*
* @throws \PhpAmqpLib\Exception\AMQPTimeoutException when timeout is set and no data received
* @throws \PhpAmqpLib\Exception\AMQPNoDataException when no data is ready to read from IO
*/
protected function wait()
{
$timeout = $this->timeout;
if (null === $timeout) {
// timeout=null just poll state and return instantly
$sec = 0;
$usec = 0;
} elseif ($timeout > 0) {
list($sec, $usec) = MiscHelper::splitSecondsMicroseconds($this->getTimeout());
} else {
// wait indefinitely for data if timeout=0
$sec = null;
$usec = 0;
}
$result = $this->io->select($sec, $usec);
if ($result === 0) {
if ($timeout > 0) {
throw new AMQPTimeoutException(sprintf(
'The connection timed out after %s sec while awaiting incoming data',
$timeout
));
} else {
throw new AMQPNoDataException('No data is ready to read');
}
}
}
}
<?php
namespace PhpAmqpLib\Wire;
use PhpAmqpLib\Channel\Frame;
use PhpAmqpLib\Exception\AMQPInvalidArgumentException;
use PhpAmqpLib\Exception\AMQPOutOfBoundsException;
use PhpAmqpLib\Helper\BigInteger;
abstract class AMQPReader extends AMQPByteStream
{
/** @var int */
protected $offset = 0;
/** @var int */
protected $bitcount = 0;
/** @var int */
protected $bits = 0;
/**
* Close the byte stream.
*/
abstract public function close(): void;
abstract protected function rawread(int $n): string;
/**
* @param int $n
* @return string
*/
public function read($n)
{
$this->resetCounters();
return $this->rawread($n);
}
public function read_bit(): bool
{
if (empty($this->bitcount)) {
$this->bits = ord($this->rawread(1));
$this->bitcount = 8;
}
$result = ($this->bits & 1) === 1;
$this->bits >>= 1;
$this->bitcount--;
return $result;
}
/**
* @return int
*/
public function read_octet()
{
$this->resetCounters();
list(, $res) = unpack('C', $this->rawread(1));
return $res;
}
/**
* @return int
*/
public function read_signed_octet()
{
$this->resetCounters();
list(, $res) = unpack('c', $this->rawread(1));
return $res;
}
/**
* @return int
*/
public function read_short()
{
$this->resetCounters();
list(, $res) = unpack('n', $this->rawread(2));
return $res;
}
/**
* @return int
*/
public function read_signed_short()
{
$this->resetCounters();
list(, $res) = unpack('s', $this->correctEndianness($this->rawread(2)));
return $res;
}
/**
* Reads 32 bit integer in big-endian byte order.
*
* On 64 bit systems it will return always unsigned int
* value in 0..2^32 range.
*
* On 32 bit systems it will return signed int value in
* -2^31...+2^31 range.
*
* Use with caution!
* @return int|string
*/
public function read_php_int()
{
list(, $res) = unpack('N', $this->rawread(4));
if (self::PLATFORM_64BIT) {
return (int) sprintf('%u', $res);
}
return $res;
}
/**
* PHP does not have unsigned 32 bit int,
* so we return it as a string
*
* @return int|string
*/
public function read_long()
{
$this->resetCounters();
list(, $res) = unpack('N', $this->rawread(4));
if (!self::PLATFORM_64BIT && $this->getLongMSB($res)) {
return sprintf('%u', $res);
}
return $res;
}
/**
* @return int
*/
private function readSignedLong()
{
$this->resetCounters();
list(, $res) = unpack('l', $this->correctEndianness($this->rawread(4)));
return $res;
}
/**
* Even on 64 bit systems PHP integers are signed.
* Since we need an unsigned value here we return it as a string.
*
* @return int|string
*/
public function read_longlong()
{
$this->resetCounters();
$bytes = $this->rawread(8);
if (self::PLATFORM_64BIT) {
// we can "unpack" if MSB bit is 0 (at most 63 bit integer), fallback to BigInteger otherwise
if (!$this->getMSB($bytes)) {
$res = unpack('J', $bytes);
return $res[1];
}
} else {
// on 32-bit systems we can "unpack" up to 31 bits integer
list(, $hi, $lo) = unpack('N2', $bytes);
if ($hi === 0 && $lo > 0) {
return $lo;
}
}
$var = new BigInteger($bytes, 256);
return $var->toString();
}
/**
* @return int|string
*/
public function read_signed_longlong()
{
$this->resetCounters();
$bytes = $this->rawread(8);
if (self::PLATFORM_64BIT) {
$res = unpack('q', $this->correctEndianness($bytes));
return $res[1];
} else {
// on 32-bit systems we can "unpack" up to 31 bits integer
list(, $hi, $lo) = unpack('N2', $bytes);
if ($hi === 0 && $lo > 0) {
// positive and less than 2^31-1
return $lo;
}
// negative and more than -2^31
if ($hi === -1 && $this->getLongMSB($lo)) {
return $lo;
}
}
$var = new BigInteger($bytes, -256);
return $var->toString();
}
/**
* @return float
*/
public function read_float()
{
$this->resetCounters();
list(, $res) = unpack('G', $this->rawread(4));
return (float)$res;
}
/**
* @return float
*/
public function read_double()
{
$this->resetCounters();
list(, $res) = unpack('E', $this->rawread(8));
return (float)$res;
}
/**
* Read a utf-8 encoded string that's stored in up to
* 255 bytes. Return it decoded as a PHP unicode object.
* @return string
*/
public function read_shortstr()
{
$this->resetCounters();
list(, $slen) = unpack('C', $this->rawread(1));
return $this->rawread($slen);
}
/**
* Read a string that's up to 2**32 bytes, the encoding
* isn't specified in the AMQP spec, so just return it as
* a plain PHP string.
* @return string
*/
public function read_longstr()
{
$this->resetCounters();
$slen = $this->read_php_int();
if ($slen < 0) {
throw new AMQPOutOfBoundsException('Strings longer than supported on this platform');
}
return $this->rawread($slen);
}
/**
* Read and AMQP timestamp, which is a 64-bit integer representing
* seconds since the Unix epoch in 1-second resolution.
* @return int|string
*/
public function read_timestamp()
{
return $this->read_longlong();
}
/**
* Read an AMQP table, and return as a PHP array. keys are strings,
* values are (type,value) tuples.
*
* @param bool $returnObject Whether to return AMQPArray instance instead of plain array
* @return array|AMQPTable
*/
public function read_table(bool $returnObject = false)
{
$this->resetCounters();
$tlen = $this->read_php_int();
if ($tlen < 0) {
throw new AMQPOutOfBoundsException('Table is longer than supported');
}
$table_data = new AMQPBufferReader($this->rawread($tlen));
$result = $returnObject ? new AMQPTable() : array();
while ($table_data->tell() < $tlen) {
$name = $table_data->read_shortstr();
$ftype = AMQPAbstractCollection::getDataTypeForSymbol($ftypeSym = $table_data->rawread(1));
$val = $table_data->read_value($ftype, $returnObject);
$returnObject ? $result->set($name, $val, $ftype) : $result[$name] = array($ftypeSym, $val);
}
return $result;
}
/**
* @return array|AMQPTable
*/
public function read_table_object()
{
return $this->read_table(true);
}
/**
* Reads the array in the next value.
*
* @param bool $returnObject Whether to return AMQPArray instance instead of plain array
* @return array|AMQPArray
*/
public function read_array($returnObject = false)
{
$this->resetCounters();
// Determine array length and its end position
$arrayLength = $this->read_php_int();
$endOffset = $this->offset + $arrayLength;
$result = $returnObject ? new AMQPArray() : array();
// Read values until we reach the end of the array
while ($this->offset < $endOffset) {
$fieldType = AMQPAbstractCollection::getDataTypeForSymbol($this->rawread(1));
$fieldValue = $this->read_value($fieldType, $returnObject);
$returnObject ? $result->push($fieldValue, $fieldType) : $result[] = $fieldValue;
}
return $result;
}
/**
* @return array|AMQPArray
*/
public function read_array_object()
{
return $this->read_array(true);
}
/**
* @return array{type:int, channel:int, size:int}
*/
public function readFrameHeader(): array
{
return unpack('Ctype/nchannel/Nsize', $this->rawread(Frame::FRAME_HEADER_SIZE));
}
/**
* Reads the next value as the provided field type.
*
* @param int $fieldType One of AMQPAbstractCollection::T_* constants
* @param bool $collectionsAsObjects Description
* @return mixed
* @throws \PhpAmqpLib\Exception\AMQPDataReadException
*/
public function read_value(int $fieldType, bool $collectionsAsObjects = false)
{
$this->resetCounters();
switch ($fieldType) {
case AMQPAbstractCollection::T_INT_SHORTSHORT:
//according to AMQP091 spec, 'b' is not bit, it is short-short-int, also valid for rabbit/qpid
//$val=$this->read_bit();
$val = $this->read_signed_octet();
break;
case AMQPAbstractCollection::T_INT_SHORTSHORT_U:
case AMQPAbstractCollection::T_BOOL:
$val = $this->read_octet();
break;
case AMQPAbstractCollection::T_INT_SHORT:
$val = $this->read_signed_short();
break;
case AMQPAbstractCollection::T_INT_SHORT_U:
$val = $this->read_short();
break;
case AMQPAbstractCollection::T_INT_LONG:
$val = $this->readSignedLong();
break;
case AMQPAbstractCollection::T_INT_LONG_U:
$val = $this->read_long();
break;
case AMQPAbstractCollection::T_INT_LONGLONG:
$val = $this->read_signed_longlong();
break;
case AMQPAbstractCollection::T_INT_LONGLONG_U:
$val = $this->read_longlong();
break;
case AMQPAbstractCollection::T_DECIMAL:
$e = $this->read_octet();
$n = $this->readSignedLong();
$val = new AMQPDecimal($n, $e);
break;
case AMQPAbstractCollection::T_TIMESTAMP:
$val = $this->read_timestamp();
break;
case AMQPAbstractCollection::T_STRING_SHORT:
$val = $this->read_shortstr();
break;
case AMQPAbstractCollection::T_STRING_LONG:
case AMQPAbstractCollection::T_BYTES:
$val = $this->read_longstr();
break;
case AMQPAbstractCollection::T_ARRAY:
$val = $this->read_array($collectionsAsObjects);
break;
case AMQPAbstractCollection::T_TABLE:
$val = $this->read_table($collectionsAsObjects);
break;
case AMQPAbstractCollection::T_VOID:
$val = null;
break;
case AMQPAbstractCollection::T_FLOAT:
$val = $this->read_float();
break;
case AMQPAbstractCollection::T_DOUBLE:
$val = $this->read_double();
break;
default:
throw new AMQPInvalidArgumentException(sprintf(
'Unsupported type "%s"',
$fieldType
));
}
return $val;
}
protected function tell(): int
{
return $this->offset;
}
protected function resetCounters(): void
{
$this->bitcount = $this->bits = 0;
}
}
<?php
namespace PhpAmqpLib\Wire;
use PhpAmqpLib\Exception;
class AMQPTable extends AMQPAbstractCollection
{
/**
* @return int
*/
final public function getType()
{
return self::T_TABLE;
}
/**
* @param string $key
* @param mixed $val
* @param int|null $type
*/
public function set($key, $val, $type = null)
{
//https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf, https://www.rabbitmq.com/resources/specs/amqp0-8.pdf
//Field names MUST start with a letter, '$' or '#' and may continue with letters, '$' or '#', digits,
// or underlines, to a maximum length of 128 characters
//The server SHOULD validate field names and upon receiving an invalid field name, it SHOULD signal a connection
// exception with reply code 503 (syntax error)
//validating length only and delegating other stuff to server, as rabbit seems to currently support numeric keys
if (!($len = strlen($key)) || ($len > 128)) {
throw new Exception\AMQPInvalidArgumentException(
'Table key must be non-empty string up to 128 chars in length'
);
}
$this->setValue($val, $type, $key);
}
}
<?php
namespace PhpAmqpLib\Wire;
use PhpAmqpLib\Exception\AMQPInvalidArgumentException;
use PhpAmqpLib\Exception\AMQPOutOfRangeException;
use PhpAmqpLib\Helper\BigInteger;
class AMQPWriter extends AMQPByteStream
{
/** @var string */
protected $out = '';
/** @var array */
protected $bits = array();
/** @var int */
protected $bitcount = 0;
private function flushbits()
{
if (!empty($this->bits)) {
$this->out .= implode('', array_map('chr', $this->bits));
$this->bits = array();
$this->bitcount = 0;
}
}
/**
* Get what's been encoded so far.
*
* @return string
*/
public function getvalue()
{
/* temporarily needed for compatibility with write_bit unit tests */
if ($this->bitcount) {
$this->flushbits();
}
return $this->out;
}
/**
* Write a plain PHP string, with no special encoding.
*
* @param string $s
*
* @return $this
*/
public function write($s)
{
$this->out .= $s;
return $this;
}
/**
* Write a boolean value.
* (deprecated, use write_bits instead)
*
* @deprecated
* @param bool $b
* @return $this
*/
public function write_bit($b)
{
$b = $b ? 1 : 0;
$shift = $this->bitcount % 8;
$last = $shift === 0 ? 0 : array_pop($this->bits);
$last |= ($b << $shift);
$this->bits[] = $last;
$this->bitcount++;
return $this;
}
/**
* Write multiple bits as an octet
*
* @param bool[] $bits
* @return $this
*/
public function write_bits($bits)
{
$value = 0;
foreach ($bits as $n => $bit) {
$bit = $bit ? 1 : 0;
$value |= ($bit << $n);
}
$this->out .= chr($value);
return $this;
}
/**
* Write an integer as an unsigned 8-bit value
*
* @param int $n
* @return $this
* @throws \PhpAmqpLib\Exception\AMQPInvalidArgumentException
*/
public function write_octet($n)
{
if ($n < 0 || $n > 255) {
throw new AMQPInvalidArgumentException('Octet out of range: ' . $n);
}
$this->out .= chr($n);
return $this;
}
/**
* @param int $n
* @return $this
*/
public function write_signed_octet($n)
{
if (($n < -128) || ($n > 127)) {
throw new AMQPInvalidArgumentException('Signed octet out of range: ' . $n);
}
$this->out .= pack('c', $n);
return $this;
}
/**
* Write an integer as an unsigned 16-bit value
*
* @param int $n
* @return $this
* @throws \PhpAmqpLib\Exception\AMQPInvalidArgumentException
*/
public function write_short($n)
{
if ($n < 0 || $n > 65535) {
throw new AMQPInvalidArgumentException('Short out of range: ' . $n);
}
$this->out .= pack('n', $n);
return $this;
}
/**
* @param int $n
* @return $this
*/
public function write_signed_short($n)
{
if (($n < -32768) || ($n > 32767)) {
throw new AMQPInvalidArgumentException('Signed short out of range: ' . $n);
}
$this->out .= $this->correctEndianness(pack('s', $n));
return $this;
}
/**
* Write an integer as an unsigned 32-bit value
*
* @param int|string $n
* @return $this
*/
public function write_long($n)
{
if (($n < 0) || ($n > 4294967295)) {
throw new AMQPInvalidArgumentException('Long out of range: ' . $n);
}
//Numeric strings >PHP_INT_MAX on 32bit are casted to PHP_INT_MAX, damn PHP
if (!self::PLATFORM_64BIT && is_string($n)) {
$n = (float) $n;
}
$this->out .= pack('N', $n);
return $this;
}
/**
* @param int $n
* @return $this
*/
private function writeSignedLong($n)
{
if (($n < -2147483648) || ($n > 2147483647)) {
throw new AMQPInvalidArgumentException('Signed long out of range: ' . $n);
}
//on my 64bit debian this approach is slightly faster than splitIntoQuads()
$this->out .= $this->correctEndianness(pack('l', $n));
return $this;
}
/**
* Write a numeric value as an unsigned 64-bit value
*
* @param int|string $n
* @return $this
* @throws AMQPOutOfRangeException
*/
public function write_longlong($n)
{
if (is_int($n)) {
if ($n < 0) {
throw new AMQPOutOfRangeException('Longlong out of range: ' . $n);
}
if (self::PLATFORM_64BIT) {
$res = pack('J', $n);
$this->out .= $res;
} else {
$this->out .= pack('NN', 0, $n);
}
return $this;
}
$value = new BigInteger($n);
if (
$value->compare(self::getBigInteger('0')) < 0
|| $value->compare(self::getBigInteger('FFFFFFFFFFFFFFFF', 16)) > 0
) {
throw new AMQPInvalidArgumentException('Longlong out of range: ' . $n);
}
$value->setPrecision(64);
$this->out .= $value->toBytes();
return $this;
}
/**
* @param int|string $n
* @return $this
*/
public function write_signed_longlong($n)
{
if (is_int($n)) {
if (self::PLATFORM_64BIT) {
// q is for 64-bit signed machine byte order
$packed = pack('q', $n);
if (self::isLittleEndian()) {
$packed = $this->convertByteOrder($packed);
}
$this->out .= $packed;
} else {
$hi = $n < 0 ? -1 : 0;
$lo = $n;
$this->out .= pack('NN', $hi, $lo);
}
return $this;
}
$value = new BigInteger($n);
if (
$value->compare(self::getBigInteger('-8000000000000000', 16)) < 0
|| $value->compare(self::getBigInteger('7FFFFFFFFFFFFFFF', 16)) > 0
) {
throw new AMQPInvalidArgumentException('Signed longlong out of range: ' . $n);
}
$value->setPrecision(64);
$this->out .= substr($value->toBytes(true), -8);
return $this;
}
/**
* Write a string up to 255 bytes long after encoding.
* Assume UTF-8 encoding
*
* @param string $s
* @return $this
* @throws \PhpAmqpLib\Exception\AMQPInvalidArgumentException
*/
public function write_shortstr($s)
{
if ($s === null) {
$this->write_octet(0);
return $this;
}
$len = mb_strlen($s, 'ASCII');
if ($len > 255) {
throw new AMQPInvalidArgumentException('String too long');
}
$this->write_octet($len);
$this->out .= $s;
return $this;
}
/**
* Write a string up to 2**32 bytes long. Assume UTF-8 encoding
*
* @param string $s
* @return $this
*/
public function write_longstr($s)
{
if ($s === null) {
$this->write_long(0);
return $this;
}
$this->write_long(mb_strlen($s, 'ASCII'));
$this->out .= $s;
return $this;
}
/**
* Supports the writing of Array types, so that you can implement
* array methods, like Rabbitmq's HA parameters
*
* @param AMQPArray|array $a Instance of AMQPArray or PHP array WITHOUT format hints (unlike write_table())
* @return self
*/
public function write_array($a)
{
if (!($a instanceof AMQPArray)) {
$a = new AMQPArray($a);
}
$data = new self();
foreach ($a as $v) {
$data->writeValue($v[0], $v[1]);
}
$data = $data->getvalue();
$this->write_long(mb_strlen($data, 'ASCII'));
$this->write($data);
return $this;
}
/**
* Write unix time_t value as 64 bit timestamp
*
* @param int $v
* @return $this
*/
public function write_timestamp($v)
{
$this->write_longlong($v);
return $this;
}
/**
* Write PHP array, as table. Input array format: keys are strings,
* values are (type,value) tuples.
*
* @param AMQPTable|array $d Instance of AMQPTable or PHP array WITH format hints (unlike write_array())
* @return $this
* @throws \PhpAmqpLib\Exception\AMQPInvalidArgumentException
*/
public function write_table($d)
{
$typeIsSym = !($d instanceof AMQPTable); //purely for back-compat purposes
$table_data = new self();
foreach ($d as $k => $va) {
list($ftype, $v) = $va;
$table_data->write_shortstr($k);
$table_data->writeValue($typeIsSym ? AMQPAbstractCollection::getDataTypeForSymbol($ftype) : $ftype, $v);
}
$table_data = $table_data->getvalue();
$this->write_long(mb_strlen($table_data, 'ASCII'));
$this->write($table_data);
return $this;
}
/**
* for compat with method mapping used by AMQPMessage
*
* @param AMQPTable|array $d
* @return $this
*/
public function write_table_object($d)
{
return $this->write_table($d);
}
/**
* @param int $type One of AMQPAbstractCollection::T_* constants
* @param mixed $val
*/
private function writeValue($type, $val)
{
//This will find appropriate symbol for given data type for currently selected protocol
//Also will raise an exception on unknown type
$this->write(AMQPAbstractCollection::getSymbolForDataType($type));
switch ($type) {
case AMQPAbstractCollection::T_INT_SHORTSHORT:
$this->write_signed_octet($val);
break;
case AMQPAbstractCollection::T_INT_SHORTSHORT_U:
$this->write_octet($val);
break;
case AMQPAbstractCollection::T_INT_SHORT:
$this->write_signed_short($val);
break;
case AMQPAbstractCollection::T_INT_SHORT_U:
$this->write_short($val);
break;
case AMQPAbstractCollection::T_INT_LONG:
$this->writeSignedLong($val);
break;
case AMQPAbstractCollection::T_INT_LONG_U:
$this->write_long($val);
break;
case AMQPAbstractCollection::T_INT_LONGLONG:
$this->write_signed_longlong($val);
break;
case AMQPAbstractCollection::T_INT_LONGLONG_U:
$this->write_longlong($val);
break;
case AMQPAbstractCollection::T_DECIMAL:
$this->write_octet($val->getE());
$this->writeSignedLong($val->getN());
break;
case AMQPAbstractCollection::T_TIMESTAMP:
$this->write_timestamp($val);
break;
case AMQPAbstractCollection::T_BOOL:
$this->write_octet($val ? 1 : 0);
break;
case AMQPAbstractCollection::T_STRING_SHORT:
$this->write_shortstr($val);
break;
case AMQPAbstractCollection::T_STRING_LONG:
$this->write_longstr($val);
break;
case AMQPAbstractCollection::T_ARRAY:
$this->write_array($val);
break;
case AMQPAbstractCollection::T_TABLE:
$this->write_table($val);
break;
case AMQPAbstractCollection::T_VOID:
break;
case AMQPAbstractCollection::T_BYTES:
$this->write_longstr($val);
break;
default:
throw new AMQPInvalidArgumentException(sprintf(
'Unsupported type "%s"',
$type
));
}
}
}
<?php
namespace PhpAmqpLib\Wire;
abstract class Constants
{
const VERSION = '';
const AMQP_HEADER = '';
/**
* @var array<int, string>
*/
protected static $FRAME_TYPES = array();
/**
* @var array<int, string>
*/
protected static $CONTENT_METHODS = array();
/**
* @var array<int, string>
*/
protected static $CLOSE_METHODS = array();
/**
* @var array<string, string>
*/
public static $GLOBAL_METHOD_NAMES = array();
/**
* @return string
*/
public function getHeader()
{
return static::AMQP_HEADER;
}
/**
* @param int $type
* @return bool
*/
public function isFrameType($type)
{
return array_key_exists($type, static::$FRAME_TYPES);
}
/**
* @param int $type
* @return string
*/
public function getFrameType($type)
{
return static::$FRAME_TYPES[$type];
}
/**
* @param string $method
* @return bool
*/
public function isContentMethod($method)
{
return in_array($method, static::$CONTENT_METHODS, false);
}
/**
* @param string $method
* @return bool
*/
public function isCloseMethod($method)
{
return in_array($method, static::$CLOSE_METHODS, false);
}
}
<?php
/* This file was autogenerated by spec/parser.php - Do not modify */
namespace PhpAmqpLib\Wire;
final class Constants080 extends Constants
{
const VERSION = '8.0';
const AMQP_HEADER = "AMQP\x01\x01\x08\x00";
/**
* @var array
*/
public static $FRAME_TYPES = array(
1 => 'FRAME-METHOD',
2 => 'FRAME-HEADER',
3 => 'FRAME-BODY',
4 => 'FRAME-OOB-METHOD',
5 => 'FRAME-OOB-HEADER',
6 => 'FRAME-OOB-BODY',
7 => 'FRAME-TRACE',
8 => 'FRAME-HEARTBEAT',
4096 => 'FRAME-MIN-SIZE',
206 => 'FRAME-END',
501 => 'FRAME-ERROR',
);
/**
* @var array
*/
public static $CONTENT_METHODS = array(
0 => '60,40',
1 => '60,50',
2 => '60,60',
3 => '60,71',
4 => '70,50',
5 => '70,70',
6 => '80,40',
7 => '80,50',
8 => '80,60',
9 => '110,10',
10 => '120,40',
11 => '120,41',
);
/**
* @var array
*/
public static $CLOSE_METHODS = array(
0 => '10,60',
1 => '20,40',
);
/**
* @var array
*/
public static $GLOBAL_METHOD_NAMES = array(
'10,10' => 'Connection.start',
'10,11' => 'Connection.start_ok',
'10,20' => 'Connection.secure',
'10,21' => 'Connection.secure_ok',
'10,30' => 'Connection.tune',
'10,31' => 'Connection.tune_ok',
'10,40' => 'Connection.open',
'10,41' => 'Connection.open_ok',
'10,50' => 'Connection.redirect',
'10,60' => 'Connection.close',
'10,61' => 'Connection.close_ok',
'20,10' => 'Channel.open',
'20,11' => 'Channel.open_ok',
'20,20' => 'Channel.flow',
'20,21' => 'Channel.flow_ok',
'20,30' => 'Channel.alert',
'20,40' => 'Channel.close',
'20,41' => 'Channel.close_ok',
'30,10' => 'Access.request',
'30,11' => 'Access.request_ok',
'40,10' => 'Exchange.declare',
'40,11' => 'Exchange.declare_ok',
'40,20' => 'Exchange.delete',
'40,21' => 'Exchange.delete_ok',
'50,10' => 'Queue.declare',
'50,11' => 'Queue.declare_ok',
'50,20' => 'Queue.bind',
'50,21' => 'Queue.bind_ok',
'50,30' => 'Queue.purge',
'50,31' => 'Queue.purge_ok',
'50,40' => 'Queue.delete',
'50,41' => 'Queue.delete_ok',
'50,50' => 'Queue.unbind',
'50,51' => 'Queue.unbind_ok',
'60,10' => 'Basic.qos',
'60,11' => 'Basic.qos_ok',
'60,20' => 'Basic.consume',
'60,21' => 'Basic.consume_ok',
'60,30' => 'Basic.cancel',
'60,31' => 'Basic.cancel_ok',
'60,40' => 'Basic.publish',
'60,50' => 'Basic.return',
'60,60' => 'Basic.deliver',
'60,70' => 'Basic.get',
'60,71' => 'Basic.get_ok',
'60,72' => 'Basic.get_empty',
'60,80' => 'Basic.ack',
'60,90' => 'Basic.reject',
'60,100' => 'Basic.recover_async',
'60,110' => 'Basic.recover',
'60,111' => 'Basic.recover_ok',
'70,10' => 'File.qos',
'70,11' => 'File.qos_ok',
'70,20' => 'File.consume',
'70,21' => 'File.consume_ok',
'70,30' => 'File.cancel',
'70,31' => 'File.cancel_ok',
'70,40' => 'File.open',
'70,41' => 'File.open_ok',
'70,50' => 'File.stage',
'70,60' => 'File.publish',
'70,70' => 'File.return',
'70,80' => 'File.deliver',
'70,90' => 'File.ack',
'70,100' => 'File.reject',
'80,10' => 'Stream.qos',
'80,11' => 'Stream.qos_ok',
'80,20' => 'Stream.consume',
'80,21' => 'Stream.consume_ok',
'80,30' => 'Stream.cancel',
'80,31' => 'Stream.cancel_ok',
'80,40' => 'Stream.publish',
'80,50' => 'Stream.return',
'80,60' => 'Stream.deliver',
'90,10' => 'Tx.select',
'90,11' => 'Tx.select_ok',
'90,20' => 'Tx.commit',
'90,21' => 'Tx.commit_ok',
'90,30' => 'Tx.rollback',
'90,31' => 'Tx.rollback_ok',
'100,10' => 'Dtx.select',
'100,11' => 'Dtx.select_ok',
'100,20' => 'Dtx.start',
'100,21' => 'Dtx.start_ok',
'110,10' => 'Tunnel.request',
'120,10' => 'Test.integer',
'120,11' => 'Test.integer_ok',
'120,20' => 'Test.string',
'120,21' => 'Test.string_ok',
'120,30' => 'Test.table',
'120,31' => 'Test.table_ok',
'120,40' => 'Test.content',
'120,41' => 'Test.content_ok',
);
}
<?php
/* This file was autogenerated by spec/parser.php - Do not modify */
namespace PhpAmqpLib\Wire;
final class Constants091 extends Constants
{
const VERSION = '0.9.1';
const AMQP_HEADER = "AMQP\x00\x00\x09\x01";
/**
* @var array
*/
public static $FRAME_TYPES = array(
1 => 'FRAME-METHOD',
2 => 'FRAME-HEADER',
3 => 'FRAME-BODY',
8 => 'FRAME-HEARTBEAT',
4096 => 'FRAME-MIN-SIZE',
206 => 'FRAME-END',
501 => 'FRAME-ERROR',
);
/**
* @var array
*/
public static $CONTENT_METHODS = array(
0 => '60,40',
1 => '60,50',
2 => '60,60',
3 => '60,71',
);
/**
* @var array
*/
public static $CLOSE_METHODS = array(
0 => '10,50',
1 => '20,40',
);
/**
* @var array
*/
public static $GLOBAL_METHOD_NAMES = array(
'10,10' => 'Connection.start',
'10,11' => 'Connection.start_ok',
'10,20' => 'Connection.secure',
'10,21' => 'Connection.secure_ok',
'10,30' => 'Connection.tune',
'10,31' => 'Connection.tune_ok',
'10,40' => 'Connection.open',
'10,41' => 'Connection.open_ok',
'10,50' => 'Connection.close',
'10,51' => 'Connection.close_ok',
'10,60' => 'Connection.blocked',
'10,61' => 'Connection.unblocked',
'20,10' => 'Channel.open',
'20,11' => 'Channel.open_ok',
'20,20' => 'Channel.flow',
'20,21' => 'Channel.flow_ok',
'20,40' => 'Channel.close',
'20,41' => 'Channel.close_ok',
'30,10' => 'Access.request',
'30,11' => 'Access.request_ok',
'40,10' => 'Exchange.declare',
'40,11' => 'Exchange.declare_ok',
'40,20' => 'Exchange.delete',
'40,21' => 'Exchange.delete_ok',
'40,30' => 'Exchange.bind',
'40,31' => 'Exchange.bind_ok',
'40,40' => 'Exchange.unbind',
'40,51' => 'Exchange.unbind_ok',
'50,10' => 'Queue.declare',
'50,11' => 'Queue.declare_ok',
'50,20' => 'Queue.bind',
'50,21' => 'Queue.bind_ok',
'50,30' => 'Queue.purge',
'50,31' => 'Queue.purge_ok',
'50,40' => 'Queue.delete',
'50,41' => 'Queue.delete_ok',
'50,50' => 'Queue.unbind',
'50,51' => 'Queue.unbind_ok',
'60,10' => 'Basic.qos',
'60,11' => 'Basic.qos_ok',
'60,20' => 'Basic.consume',
'60,21' => 'Basic.consume_ok',
'60,30' => 'Basic.cancel',
'60,31' => 'Basic.cancel_ok',
'60,40' => 'Basic.publish',
'60,50' => 'Basic.return',
'60,60' => 'Basic.deliver',
'60,70' => 'Basic.get',
'60,71' => 'Basic.get_ok',
'60,72' => 'Basic.get_empty',
'60,80' => 'Basic.ack',
'60,90' => 'Basic.reject',
'60,100' => 'Basic.recover_async',
'60,110' => 'Basic.recover',
'60,111' => 'Basic.recover_ok',
'60,120' => 'Basic.nack',
'90,10' => 'Tx.select',
'90,11' => 'Tx.select_ok',
'90,20' => 'Tx.commit',
'90,21' => 'Tx.commit_ok',
'90,30' => 'Tx.rollback',
'90,31' => 'Tx.rollback_ok',
'85,10' => 'Confirm.select',
'85,11' => 'Confirm.select_ok',
);
}
<?php
namespace PhpAmqpLib\Wire\IO;
use PhpAmqpLib\Connection\AMQPConnectionConfig;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPHeartbeatMissedException;
use PhpAmqpLib\Exception\AMQPIOWaitException;
use PhpAmqpLib\Wire\AMQPWriter;
abstract class AbstractIO
{
const BUFFER_SIZE = 8192;
/** @var null|AMQPConnectionConfig */
protected $config;
/** @var string */
protected $host;
/** @var int */
protected $port;
/** @var int|float */
protected $connection_timeout;
/** @var float */
protected $read_timeout;
/** @var float */
protected $write_timeout;
/** @var int */
protected $heartbeat;
/** @var int */
protected $initial_heartbeat;
/** @var bool */
protected $keepalive;
/** @var int|float */
protected $last_read;
/** @var int|float */
protected $last_write;
/** @var array|null */
protected $last_error;
/** @var bool */
protected $canDispatchPcntlSignal = false;
/**
* @param int $len
* @return string
* @throws \PhpAmqpLib\Exception\AMQPIOException
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
* @throws \PhpAmqpLib\Exception\AMQPSocketException
* @throws \PhpAmqpLib\Exception\AMQPTimeoutException
* @throws \PhpAmqpLib\Exception\AMQPConnectionClosedException
*/
abstract public function read($len);
/**
* @param string $data
* @throws \PhpAmqpLib\Exception\AMQPIOException
* @throws \PhpAmqpLib\Exception\AMQPSocketException
* @throws \PhpAmqpLib\Exception\AMQPConnectionClosedException
* @throws \PhpAmqpLib\Exception\AMQPTimeoutException
*/
abstract public function write($data);
/**
* @return void
*/
abstract public function close();
/**
* @param int|null $sec
* @param int $usec
* @return int
* @throws \PhpAmqpLib\Exception\AMQPIOException
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
*/
public function select(?int $sec, int $usec = 0)
{
$this->check_heartbeat();
$this->setErrorHandler();
try {
$result = $this->do_select($sec, $usec);
$this->throwOnError();
} catch (\ErrorException $e) {
throw new AMQPIOWaitException($e->getMessage(), $e->getCode(), $e);
} finally {
$this->restoreErrorHandler();
}
if ($this->canDispatchPcntlSignal) {
pcntl_signal_dispatch();
}
// no exception and false result - either timeout or signal was sent
if ($result === false) {
$result = 0;
}
return $result;
}
/**
* @param int|null $sec
* @param int $usec
* @return int|bool
* @throws AMQPConnectionClosedException
*/
abstract protected function do_select(?int $sec, int $usec);
/**
* Set ups the connection.
* @return void
* @throws \PhpAmqpLib\Exception\AMQPIOException
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
*/
abstract public function connect();
/**
* Set connection params connection tune(negotiation).
* @param int $heartbeat
*/
public function afterTune(int $heartbeat): void
{
$this->heartbeat = $heartbeat;
$this->initial_heartbeat = $heartbeat;
}
/**
* Heartbeat logic: check connection health here
* @return void
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
*/
public function check_heartbeat()
{
// ignore unless heartbeat interval is set
if ($this->heartbeat !== 0 && $this->last_read > 0 && $this->last_write > 0) {
// server has gone away
$this->checkBrokerHeartbeat();
// time for client to send a heartbeat
$now = microtime(true);
if (($this->heartbeat / 2) < $now - $this->last_write) {
$this->write_heartbeat();
}
}
}
/**
* @throws \PhpAmqpLib\Exception\AMQPHeartbeatMissedException
*/
protected function checkBrokerHeartbeat()
{
if ($this->heartbeat > 0 && ($this->last_read > 0 || $this->last_write > 0)) {
$lastActivity = $this->getLastActivity();
$now = microtime(true);
if (($now - $lastActivity) > $this->heartbeat * 2 + 1) {
$this->close();
throw new AMQPHeartbeatMissedException('Missed server heartbeat');
}
}
}
/**
* @return float|int
*/
public function getLastActivity()
{
return max($this->last_read, $this->last_write);
}
public function getReadTimeout(): float
{
return $this->read_timeout;
}
/**
* @return $this
*/
public function disableHeartbeat()
{
$this->initial_heartbeat = $this->heartbeat;
$this->heartbeat = 0;
return $this;
}
/**
* @return $this
*/
public function reenableHeartbeat()
{
$this->heartbeat = $this->initial_heartbeat;
return $this;
}
/**
* Sends a heartbeat message
*/
protected function write_heartbeat()
{
$pkt = new AMQPWriter();
$pkt->write_octet(8);
$pkt->write_short(0);
$pkt->write_long(0);
$pkt->write_octet(0xCE);
$this->write($pkt->getvalue());
}
/**
* Begin tracking errors and set the error handler
*/
protected function setErrorHandler(): void
{
$this->last_error = null;
set_error_handler(array($this, 'error_handler'));
}
protected function throwOnError(): void
{
if ($this->last_error !== null) {
throw new \ErrorException(
$this->last_error['errstr'],
0,
$this->last_error['errno'],
$this->last_error['errfile'],
$this->last_error['errline']
);
}
}
protected function restoreErrorHandler(): void
{
restore_error_handler();
}
/**
* Internal error handler to deal with stream and socket errors.
*
* @param int $errno
* @param string $errstr
* @param string $errfile
* @param int $errline
* @param array $errcontext
* @return void
*/
public function error_handler($errno, $errstr, $errfile, $errline, $errcontext = null)
{
// throwing an exception in an error handler will halt execution
// set the last error and continue
$this->last_error = compact('errno', 'errstr', 'errfile', 'errline', 'errcontext');
}
/**
* @return bool
*/
protected function isPcntlSignalEnabled()
{
return extension_loaded('pcntl')
&& function_exists('pcntl_signal_dispatch')
&& (defined('AMQP_WITHOUT_SIGNALS') ? !AMQP_WITHOUT_SIGNALS : true);
}
}
<?php
namespace PhpAmqpLib\Wire\IO;
use PhpAmqpLib\Connection\AMQPConnectionConfig;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Exception\AMQPSocketException;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Helper\MiscHelper;
use PhpAmqpLib\Helper\SocketConstants;
class SocketIO extends AbstractIO
{
/** @var null|resource */
private $sock;
/**
* @param string $host
* @param int $port
* @param int|float $read_timeout
* @param bool $keepalive
* @param int|float|null $write_timeout if null defaults to read timeout
* @param int $heartbeat how often to send heartbeat. 0 means off
* @param null|AMQPConnectionConfig $config
*/
public function __construct(
$host,
$port,
$read_timeout = 3,
$keepalive = false,
$write_timeout = null,
$heartbeat = 0,
?AMQPConnectionConfig $config = null
) {
$this->config = $config;
$this->host = $host;
$this->port = $port;
$this->read_timeout = (float)$read_timeout;
$this->write_timeout = (float)($write_timeout ?: $read_timeout);
$this->heartbeat = $heartbeat;
$this->initial_heartbeat = $heartbeat;
$this->keepalive = $keepalive;
$this->canDispatchPcntlSignal = $this->isPcntlSignalEnabled();
/*
TODO FUTURE enable this check
php-amqplib/php-amqplib#648, php-amqplib/php-amqplib#666
if ($this->heartbeat !== 0 && ($this->read_timeout <= ($this->heartbeat * 2))) {
throw new \InvalidArgumentException('read_timeout must be greater than 2x the heartbeat');
}
if ($this->heartbeat !== 0 && ($this->write_timeout <= ($this->heartbeat * 2))) {
throw new \InvalidArgumentException('send_timeout must be greater than 2x the heartbeat');
}
*/
}
/**
* @inheritdoc
*/
public function connect()
{
$this->sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->write_timeout);
socket_set_option($this->sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => $sec, 'usec' => $uSec));
list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout);
socket_set_option($this->sock, SOL_SOCKET, SO_RCVTIMEO, array('sec' => $sec, 'usec' => $uSec));
$this->setErrorHandler();
try {
$connected = socket_connect($this->sock, $this->host, $this->port);
$this->throwOnError();
} catch (\ErrorException $e) {
$connected = false;
} finally {
$this->restoreErrorHandler();
}
if (!$connected) {
$errno = socket_last_error($this->sock);
$errstr = socket_strerror($errno);
throw new AMQPIOException(sprintf(
'Error Connecting to server (%s): %s',
$errno,
$errstr
), $errno);
}
socket_set_block($this->sock);
socket_set_option($this->sock, SOL_TCP, TCP_NODELAY, 1);
if ($this->config && $this->config->getSendBufferSize() > 0) {
socket_set_option($this->sock, SOL_SOCKET, SO_SNDBUF, $this->config->getSendBufferSize());
}
if ($this->keepalive) {
$this->enable_keepalive();
}
$this->heartbeat = $this->initial_heartbeat;
}
/**
* @inheritdoc
*/
public function getSocket()
{
return $this->sock;
}
/**
* @inheritdoc
*/
public function read($len)
{
if (is_null($this->sock)) {
throw new AMQPSocketException(sprintf(
'Socket was null! Last SocketError was: %s',
socket_strerror(socket_last_error())
));
}
$this->check_heartbeat();
list($timeout_sec, $timeout_uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout);
$read_start = microtime(true);
$read = 0;
$data = '';
while ($read < $len) {
$buffer = null;
$result = socket_recv($this->sock, $buffer, $len - $read, 0);
if ($result === 0) {
// From linux recv() manual:
// When a stream socket peer has performed an orderly shutdown,
// the return value will be 0 (the traditional "end-of-file" return).
// http://php.net/manual/en/function.socket-recv.php#47182
$this->close();
throw new AMQPConnectionClosedException('Broken pipe or closed connection');
}
if (empty($buffer)) {
$read_now = microtime(true);
$t_read = $read_now - $read_start;
if ($t_read > $this->read_timeout) {
throw new AMQPTimeoutException('Too many read attempts detected in SocketIO');
}
$this->select($timeout_sec, $timeout_uSec);
continue;
}
$read += mb_strlen($buffer, 'ASCII');
$data .= $buffer;
}
if (mb_strlen($data, 'ASCII') !== $len) {
throw new AMQPIOException(sprintf(
'Error reading data. Received %s instead of expected %s bytes',
mb_strlen($data, 'ASCII'),
$len
));
}
$this->last_read = microtime(true);
return $data;
}
/**
* @inheritdoc
*/
public function write($data)
{
// Null sockets are invalid, throw exception
if (is_null($this->sock)) {
throw new AMQPSocketException(sprintf(
'Socket was null! Last SocketError was: %s',
socket_strerror(socket_last_error())
));
}
$this->checkBrokerHeartbeat();
$written = 0;
$len = mb_strlen($data, 'ASCII');
$write_start = microtime(true);
while ($written < $len) {
$this->setErrorHandler();
try {
$this->select_write();
$buffer = mb_substr($data, $written, self::BUFFER_SIZE, 'ASCII');
$result = socket_write($this->sock, $buffer);
$this->throwOnError();
} catch (\ErrorException $e) {
$code = socket_last_error($this->sock);
$constants = SocketConstants::getInstance();
switch ($code) {
case $constants->SOCKET_EPIPE:
case $constants->SOCKET_ENETDOWN:
case $constants->SOCKET_ENETUNREACH:
case $constants->SOCKET_ENETRESET:
case $constants->SOCKET_ECONNABORTED:
case $constants->SOCKET_ECONNRESET:
case $constants->SOCKET_ECONNREFUSED:
case $constants->SOCKET_ETIMEDOUT:
$this->close();
throw new AMQPConnectionClosedException(socket_strerror($code), $code, $e);
default:
throw new AMQPIOException(sprintf(
'Error sending data. Last SocketError: %s',
socket_strerror($code)
), $code, $e);
}
} finally {
$this->restoreErrorHandler();
}
if ($result === false) {
throw new AMQPIOException(sprintf(
'Error sending data. Last SocketError: %s',
socket_strerror(socket_last_error($this->sock))
));
}
$now = microtime(true);
if ($result > 0) {
$this->last_write = $write_start = $now;
$written += $result;
} else {
if (($now - $write_start) > $this->write_timeout) {
throw AMQPTimeoutException::writeTimeout($this->write_timeout);
}
}
}
}
/**
* @inheritdoc
*/
public function close()
{
$this->disableHeartbeat();
if (is_resource($this->sock) || is_a($this->sock, \Socket::class)) {
socket_close($this->sock);
}
$this->sock = null;
$this->last_read = 0;
$this->last_write = 0;
}
/**
* @inheritdoc
*/
protected function do_select(?int $sec, int $usec)
{
if (!is_resource($this->sock) && !is_a($this->sock, \Socket::class)) {
$this->sock = null;
throw new AMQPConnectionClosedException('Broken pipe or closed connection', 0);
}
$read = array($this->sock);
$write = null;
$except = null;
return socket_select($read, $write, $except, $sec, $usec);
}
/**
* @return int|bool
*/
protected function select_write()
{
$read = $except = null;
$write = array($this->sock);
return socket_select($read, $write, $except, 0, 100000);
}
/**
* @throws \PhpAmqpLib\Exception\AMQPIOException
*/
protected function enable_keepalive()
{
if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) {
throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined');
}
socket_set_option($this->sock, SOL_SOCKET, SO_KEEPALIVE, 1);
}
/**
* @inheritdoc
*/
public function error_handler($errno, $errstr, $errfile, $errline, $errcontext = null)
{
$constants = SocketConstants::getInstance();
// socket_select warning that it has been interrupted by a signal - EINTR
if (isset($constants->SOCKET_EINTR) && false !== strrpos($errstr, socket_strerror($constants->SOCKET_EINTR))) {
// it's allowed while processing signals
return;
}
parent::error_handler($errno, $errstr, $errfile, $errline, $errcontext);
}
/**
* @inheritdoc
*/
protected function setErrorHandler(): void
{
parent::setErrorHandler();
socket_clear_error($this->sock);
}
}
<?php
namespace PhpAmqpLib\Wire\IO;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPDataReadException;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Helper\MiscHelper;
use PhpAmqpLib\Helper\SocketConstants;
class StreamIO extends AbstractIO
{
/** @var string */
protected $protocol;
/** @var null|resource */
protected $context;
/** @var null|resource */
private $sock;
/**
* @param string $host
* @param int $port
* @param float $connection_timeout
* @param float $read_write_timeout
* @param resource|array|null $context
* @param bool $keepalive
* @param int $heartbeat
* @param string|null $ssl_protocol
*/
public function __construct(
$host,
$port,
$connection_timeout,
$read_write_timeout,
$context = null,
$keepalive = false,
$heartbeat = 0,
$ssl_protocol = null
) {
// TODO FUTURE change comparison to <=
// php-amqplib/php-amqplib#648, php-amqplib/php-amqplib#666
/*
TODO FUTURE enable this check
if ($heartbeat !== 0 && ($read_write_timeout < ($heartbeat * 2))) {
throw new \InvalidArgumentException('read_write_timeout must be at least 2x the heartbeat');
}
*/
if (!is_resource($context) || get_resource_type($context) !== 'stream-context') {
$context = stream_context_create();
}
$this->protocol = 'tcp';
$this->host = $host;
$this->port = $port;
$this->connection_timeout = $connection_timeout;
$this->read_timeout = (float)$read_write_timeout;
$this->write_timeout = (float)$read_write_timeout;
$this->context = $context;
$this->keepalive = $keepalive;
$this->heartbeat = $heartbeat;
$this->initial_heartbeat = $heartbeat;
$this->canDispatchPcntlSignal = $this->isPcntlSignalEnabled();
stream_context_set_option($this->context, 'socket', 'tcp_nodelay', true);
$options = stream_context_get_options($this->context);
if (!empty($options['ssl'])) {
if (isset($ssl_protocol)) {
$this->protocol = $ssl_protocol;
} else {
$this->protocol = 'ssl';
}
}
}
/**
* @inheritdoc
*/
public function connect()
{
$errstr = $errno = null;
$remote = sprintf(
'%s://%s:%s',
$this->protocol,
$this->host,
$this->port
);
$this->setErrorHandler();
try {
$this->sock = stream_socket_client(
$remote,
$errno,
$errstr,
$this->connection_timeout,
STREAM_CLIENT_CONNECT,
$this->context
);
$this->throwOnError();
} catch (\ErrorException $e) {
throw new AMQPIOException($e->getMessage());
} finally {
$this->restoreErrorHandler();
}
if (false === $this->sock) {
throw new AMQPIOException(
sprintf(
'Error Connecting to server(%s): %s ',
$errno,
$errstr
),
$errno
);
}
if (!stream_socket_get_name($this->sock, true)) {
throw new AMQPIOException(
sprintf(
'Connection refused: %s ',
$remote
)
);
}
list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds(max($this->read_timeout, $this->write_timeout));
if (!stream_set_timeout($this->sock, $sec, $uSec)) {
throw new AMQPIOException('Timeout could not be set');
}
// php cannot capture signals while streams are blocking
if ($this->canDispatchPcntlSignal) {
stream_set_blocking($this->sock, 0);
stream_set_write_buffer($this->sock, 0);
if (function_exists('stream_set_read_buffer')) {
stream_set_read_buffer($this->sock, 0);
}
} else {
stream_set_blocking($this->sock, true);
}
if ($this->keepalive) {
$this->enable_keepalive();
}
$this->heartbeat = $this->initial_heartbeat;
}
/**
* @inheritdoc
*/
public function read($len)
{
$this->check_heartbeat();
list($timeout_sec, $timeout_uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout);
$read_start = microtime(true);
$read = 0;
$data = '';
while ($read < $len) {
if (!is_resource($this->sock) || feof($this->sock)) {
$this->close();
throw new AMQPConnectionClosedException('Broken pipe or closed connection');
}
$this->setErrorHandler();
try {
$buffer = fread($this->sock, ($len - $read));
$this->throwOnError();
} catch (\ErrorException $e) {
throw new AMQPDataReadException($e->getMessage(), $e->getCode(), $e);
} finally {
$this->restoreErrorHandler();
}
if ($buffer === false) {
throw new AMQPDataReadException('Error receiving data');
}
if ($buffer === '') {
$read_now = microtime(true);
$t_read = $read_now - $read_start;
if ($t_read > $this->read_timeout) {
throw new AMQPTimeoutException('Too many read attempts detected in StreamIO');
}
$this->select($timeout_sec, $timeout_uSec);
continue;
}
$this->last_read = microtime(true);
$read_start = $this->last_read;
$read += mb_strlen($buffer, 'ASCII');
$data .= $buffer;
}
if (mb_strlen($data, 'ASCII') !== $len) {
throw new AMQPDataReadException(
sprintf(
'Error reading data. Received %s instead of expected %s bytes',
mb_strlen($data, 'ASCII'),
$len
)
);
}
$this->last_read = microtime(true);
return $data;
}
/**
* @inheritdoc
*/
public function write($data)
{
$this->checkBrokerHeartbeat();
$written = 0;
$len = mb_strlen($data, 'ASCII');
$write_start = microtime(true);
while ($written < $len) {
if (!is_resource($this->sock) || feof($this->sock)) {
$this->close();
$constants = SocketConstants::getInstance();
throw new AMQPConnectionClosedException('Broken pipe or closed connection', $constants->SOCKET_EPIPE);
}
$result = false;
$this->setErrorHandler();
// OpenSSL's C library function SSL_write() can balk on buffers > 8192
// bytes in length, so we're limiting the write size here. On both TLS
// and plaintext connections, the write loop will continue until the
// buffer has been fully written.
// This behavior has been observed in OpenSSL dating back to at least
// September 2002:
// http://comments.gmane.org/gmane.comp.encryption.openssl.user/4361
try {
// check stream and prevent from high CPU usage
$this->select_write();
$buffer = mb_substr($data, $written, self::BUFFER_SIZE, 'ASCII');
$result = fwrite($this->sock, $buffer);
$this->throwOnError();
} catch (\ErrorException $e) {
$code = $this->last_error['errno'];
$constants = SocketConstants::getInstance();
switch ($code) {
case $constants->SOCKET_EPIPE:
case $constants->SOCKET_ENETDOWN:
case $constants->SOCKET_ENETUNREACH:
case $constants->SOCKET_ENETRESET:
case $constants->SOCKET_ECONNABORTED:
case $constants->SOCKET_ECONNRESET:
case $constants->SOCKET_ECONNREFUSED:
case $constants->SOCKET_ETIMEDOUT:
$this->close();
throw new AMQPConnectionClosedException(socket_strerror($code), $code, $e);
default:
throw new AMQPRuntimeException($e->getMessage(), $code, $e);
}
} finally {
$this->restoreErrorHandler();
}
if ($result === false) {
throw new AMQPRuntimeException('Error sending data');
}
if ($this->timed_out()) {
throw AMQPTimeoutException::writeTimeout($this->write_timeout);
}
$now = microtime(true);
if ($result > 0) {
$this->last_write = $write_start = $now;
$written += $result;
} else {
if (feof($this->sock)) {
$this->close();
throw new AMQPConnectionClosedException('Broken pipe or closed connection');
}
if (($now - $write_start) > $this->write_timeout) {
throw AMQPTimeoutException::writeTimeout($this->write_timeout);
}
}
}
}
/**
* @inheritdoc
*/
public function error_handler($errno, $errstr, $errfile, $errline, $errcontext = null)
{
$code = $this->extract_error_code($errstr);
$constants = SocketConstants::getInstance();
switch ($code) {
// fwrite notice that the stream isn't ready - EAGAIN or EWOULDBLOCK
case $constants->SOCKET_EAGAIN:
case $constants->SOCKET_EWOULDBLOCK:
// stream_select warning that it has been interrupted by a signal - EINTR
case $constants->SOCKET_EINTR:
return;
}
parent::error_handler($code > 0 ? $code : $errno, $errstr, $errfile, $errline, $errcontext);
}
public function close()
{
$this->disableHeartbeat();
if (is_resource($this->sock)) {
fclose($this->sock);
}
$this->sock = null;
$this->last_read = 0;
$this->last_write = 0;
}
/**
* @inheritdoc
*/
public function getSocket()
{
return $this->sock;
}
/**
* @inheritdoc
*/
protected function do_select(?int $sec, int $usec)
{
if ($this->sock === null || !is_resource($this->sock)) {
$this->sock = null;
throw new AMQPConnectionClosedException('Broken pipe or closed connection', 0);
}
$read = array($this->sock);
$write = null;
$except = null;
if ($sec === null && PHP_VERSION_ID >= 80100) {
$usec = 0;
}
return stream_select($read, $write, $except, $sec, $usec);
}
/**
* @return int|bool
*/
protected function select_write()
{
$read = $except = null;
$write = array($this->sock);
return stream_select($read, $write, $except, 0, 100000);
}
/**
* @return mixed
*/
protected function timed_out()
{
// get status of socket to determine whether or not it has timed out
$info = stream_get_meta_data($this->sock);
return $info['timed_out'];
}
/**
* @throws \PhpAmqpLib\Exception\AMQPIOException
*/
protected function enable_keepalive()
{
if ($this->protocol === 'ssl') {
throw new AMQPIOException('Can not enable keepalive: ssl connection does not support keepalive (#70939)');
}
if ($this->protocol === 'tls') {
throw new AMQPIOException('Can not enable keepalive: tls connection does not support keepalive (#70939)');
}
if (!function_exists('socket_import_stream')) {
throw new AMQPIOException('Can not enable keepalive: function socket_import_stream does not exist');
}
if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) {
throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined');
}
$socket = socket_import_stream($this->sock);
socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
}
/**
* @param string $message
* @return int
*/
protected function extract_error_code($message)
{
if (0 === strpos($message, 'stream_select():')) {
$pattern = '/\s+\[(\d+)\]:\s+/';
} else {
$pattern = '/\s+errno=(\d+)\s+/';
}
$matches = array();
$result = preg_match($pattern, $message, $matches);
if ($result > 0) {
return (int)$matches[1];
}
return 0;
}
}
# php-amqplib #
![PHPUnit tests](https://github.com/php-amqplib/php-amqplib/workflows/PHPUnit%20tests/badge.svg)
[![Latest Version on Packagist][ico-version]][link-packagist]
[![Total Downloads][ico-downloads]][link-downloads]
[![Software License][ico-license]](LICENSE)
[![codecov](https://codecov.io/gh/php-amqplib/php-amqplib/branch/master/graph/badge.svg?token=tgeYkUsaDM)](https://codecov.io/gh/php-amqplib/php-amqplib)
[![Coverage Status][ico-scrutinizer]][link-scrutinizer]
[![Quality Score][ico-code-quality]][link-code-quality]
This library is a _pure PHP_ implementation of the [AMQP 0-9-1 protocol](http://www.rabbitmq.com/tutorials/amqp-concepts.html).
It's been tested against [RabbitMQ](http://www.rabbitmq.com/).
The library was used for the PHP examples of [RabbitMQ in Action](http://manning.com/videla/) and the [official RabbitMQ tutorials](http://www.rabbitmq.com/tutorials/tutorial-one-php.html).
Please note that this project is released with a [Contributor Code of Conduct](.github/CODE_OF_CONDUCT.md). By participating in this project you agree to abide by its terms.
## Project Maintainers
Thanks to [videlalvaro](https://github.com/videlalvaro) and [postalservice14](https://github.com/postalservice14) for creating `php-amqplib`.
The package is now maintained by [Ramūnas Dronga](https://github.com/ramunasd), [Luke Bakken](https://github.com/lukebakken) and several VMware engineers working on RabbitMQ.
## Supported RabbitMQ Versions ##
Starting with version 2.0 this library uses `AMQP 0.9.1` by default and thus requires [RabbitMQ 2.0 or later version](http://www.rabbitmq.com/download.html).
Usually server upgrades do not require any application code changes since
the protocol changes very infrequently but please conduct your own testing before upgrading.
## Supported RabbitMQ Extensions ##
Since the library uses `AMQP 0.9.1` we added support for the following RabbitMQ extensions:
* Exchange to Exchange Bindings
* Basic Nack
* Publisher Confirms
* Consumer Cancel Notify
Extensions that modify existing methods like `alternate exchanges` are also supported.
### Related libraries
* [enqueue/amqp-lib](https://github.com/php-enqueue/amqp-lib) is a [amqp interop](https://github.com/queue-interop/queue-interop#amqp-interop) compatible wrapper.
* [AMQProxy](https://github.com/cloudamqp/amqproxy) is a proxy library with connection and channel pooling/reusing. This allows for lower connection and channel churn when using php-amqplib, leading to less CPU usage of RabbitMQ.
## Setup ##
Ensure you have [composer](http://getcomposer.org) installed, then run the following command:
```bash
$ composer require php-amqplib/php-amqplib
```
That will fetch the library and its dependencies inside your vendor folder. Then you can add the following to your
.php files in order to use the library
```php
require_once __DIR__.'/vendor/autoload.php';
```
Then you need to `use` the relevant classes, for example:
```php
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
```
## Usage ##
With RabbitMQ running open two Terminals and on the first one execute the following commands to start the consumer:
```bash
$ cd php-amqplib/demo
$ php amqp_consumer.php
```
Then on the other Terminal do:
```bash
$ cd php-amqplib/demo
$ php amqp_publisher.php some text to publish
```
You should see the message arriving to the process on the other Terminal
Then to stop the consumer, send to it the `quit` message:
```bash
$ php amqp_publisher.php quit
```
If you need to listen to the sockets used to connect to RabbitMQ then see the example in the non blocking consumer.
```bash
$ php amqp_consumer_non_blocking.php
```
## Change log
Please see [CHANGELOG](CHANGELOG.md) for more information what has changed recently.
## API Documentation ##
http://php-amqplib.github.io/php-amqplib/
## Tutorials ##
To not repeat ourselves, if you want to learn more about this library,
please refer to the [official RabbitMQ tutorials](http://www.rabbitmq.com/tutorials/tutorial-one-php.html).
## More Examples ##
- `amqp_ha_consumer.php`: demos the use of mirrored queues.
- `amqp_consumer_exclusive.php` and `amqp_publisher_exclusive.php`: demos fanout exchanges using exclusive queues.
- `amqp_consumer_fanout_{1,2}.php` and `amqp_publisher_fanout.php`: demos fanout exchanges with named queues.
- `amqp_consumer_pcntl_heartbeat.php`: demos signal-based heartbeat sender usage.
- `basic_get.php`: demos obtaining messages from the queues by using the _basic get_ AMQP call.
## Multiple hosts connections ##
If you have a cluster of multiple nodes to which your application can connect,
you can start a connection with an array of hosts. To do that you should use
the `create_connection` static method.
For example:
```php
$connection = AMQPStreamConnection::create_connection([
['host' => HOST1, 'port' => PORT, 'user' => USER, 'password' => PASS, 'vhost' => VHOST],
['host' => HOST2, 'port' => PORT, 'user' => USER, 'password' => PASS, 'vhost' => VHOST]
],
$options);
```
This code will try to connect to `HOST1` first, and connect to `HOST2` if the
first connection fails. The method returns a connection object for the first
successful connection. Should all connections fail it will throw the exception
from the last connection attempt.
See `demo/amqp_connect_multiple_hosts.php` for more examples.
## Batch Publishing ##
Let's say you have a process that generates a bunch of messages that are going to be published to the same `exchange` using the same `routing_key` and options like `mandatory`.
Then you could make use of the `batch_basic_publish` library feature. You can batch messages like this:
```php
$msg = new AMQPMessage($msg_body);
$ch->batch_basic_publish($msg, $exchange);
$msg2 = new AMQPMessage($msg_body);
$ch->batch_basic_publish($msg2, $exchange);
```
and then send the batch like this:
```php
$ch->publish_batch();
```
### When do we publish the message batch? ###
Let's say our program needs to read from a file and then publish one message per line. Depending on the message size, you will have to decide when it's better to send the batch.
You could send it every 50 messages, or every hundred. That's up to you.
## Optimized Message Publishing ##
Another way to speed up your message publishing is by reusing the `AMQPMessage` message instances. You can create your new message like this:
```php
$properties = array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT);
$msg = new AMQPMessage($body, $properties);
$ch->basic_publish($msg, $exchange);
```
Now let's say that while you want to change the message body for future messages, you will keep the same properties, that is, your messages will still be `text/plain` and the `delivery_mode` will still be `AMQPMessage::DELIVERY_MODE_PERSISTENT`. If you create a new `AMQPMessage` instance for every published message, then those properties would have to be re-encoded in the AMQP binary format. You could avoid all that by just reusing the `AMQPMessage` and then resetting the message body like this:
```php
$msg->setBody($body2);
$ch->basic_publish($msg, $exchange);
```
## Truncating Large Messages ##
AMQP imposes no limit on the size of messages; if a very large message is received by a consumer, PHP's memory limit may be reached
within the library before the callback passed to `basic_consume` is called.
To avoid this, you can call the method `AMQPChannel::setBodySizeLimit(int $bytes)` on your Channel instance. Body sizes exceeding this limit will be truncated,
and delivered to your callback with a `AMQPMessage::$is_truncated` flag set to `true`. The property `AMQPMessage::$body_size` will reflect the true body size of
a received message, which will be higher than `strlen(AMQPMessage::getBody())` if the message has been truncated.
Note that all data above the limit is read from the AMQP Channel and immediately discarded, so there is no way to retrieve it within your
callback. If you have another consumer which can handle messages with larger payloads, you can use `basic_reject` or `basic_nack` to tell
the server (which still has a complete copy) to forward it to a Dead Letter Exchange.
By default, no truncation will occur. To disable truncation on a Channel that has had it enabled, pass `0` (or `null`) to `AMQPChannel::setBodySizeLimit()`.
## Connection recovery ##
Some RabbitMQ clients using automated connection recovery mechanisms to reconnect
and recover channels and consumers in case of network errors.
Since this client is using a single-thread, you can set up connection recovery
using exception handling mechanism.
Exceptions which might be thrown in case of connection errors:
```php
PhpAmqpLib\Exception\AMQPConnectionClosedException
PhpAmqpLib\Exception\AMQPIOException
\RuntimeException
\ErrorException
```
Some other exceptions might be thrown, but connection can still be there. It's
always a good idea to clean up an old connection when handling an exception
before reconnecting.
For example, if you want to set up a recovering connection:
```php
$connection = null;
$channel = null;
while(true){
try {
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
// Your application code goes here.
do_something_with_connection($connection);
} catch(AMQPRuntimeException $e) {
echo $e->getMessage();
cleanup_connection($connection);
usleep(WAIT_BEFORE_RECONNECT_uS);
} catch(\RuntimeException $e) {
cleanup_connection($connection);
usleep(WAIT_BEFORE_RECONNECT_uS);
} catch(\ErrorException $e) {
cleanup_connection($connection);
usleep(WAIT_BEFORE_RECONNECT_uS);
}
}
```
A full example is in `demo/connection_recovery_consume.php`.
This code will reconnect and retry the application code every time the
exception occurs. Some exceptions can still be thrown and should not be handled
as a part of reconnection process, because they might be application errors.
This approach makes sense mostly for consumer applications, producers will
require some additional application code to avoid publishing the same message
multiple times.
This was a simplest example, in a real-life application you might want to
control retr count and maybe gracefully degrade wait time to reconnection.
You can find a more excessive example in [#444](https://github.com/php-amqplib/php-amqplib/issues/444)
## UNIX Signals ##
If you have installed [PCNTL extension](http://www.php.net/manual/en/book.pcntl.php) dispatching of signal will be handled when consumer is not processing message.
```php
$pcntlHandler = function ($signal) {
switch ($signal) {
case \SIGTERM:
case \SIGUSR1:
case \SIGINT:
// some stuff before stop consumer e.g. delete lock etc
pcntl_signal($signal, SIG_DFL); // restore handler
posix_kill(posix_getpid(), $signal); // kill self with signal, see https://www.cons.org/cracauer/sigint.html
case \SIGHUP:
// some stuff to restart consumer
break;
default:
// do nothing
}
};
pcntl_signal(\SIGTERM, $pcntlHandler);
pcntl_signal(\SIGINT, $pcntlHandler);
pcntl_signal(\SIGUSR1, $pcntlHandler);
pcntl_signal(\SIGHUP, $pcntlHandler);
```
To disable this feature just define constant `AMQP_WITHOUT_SIGNALS` as `true`
```php
<?php
define('AMQP_WITHOUT_SIGNALS', true);
... more code
```
## Signal-based Heartbeat ##
If you have installed [PCNTL extension](http://www.php.net/manual/en/book.pcntl.php) and are using PHP 7.1 or greater,
you can register a signal-based heartbeat sender.
```php
<?php
$sender = new PCNTLHeartbeatSender($connection);
$sender->register();
... code
$sender->unregister();
```
## Debugging ##
If you want to know what's going on at a protocol level then add the following constant to your code:
```php
<?php
define('AMQP_DEBUG', true);
... more code
?>
```
## Benchmarks ##
To run the publishing/consume benchmark type:
```bash
$ make benchmark
```
## Tests ##
To successfully run the tests you need to first have a stock RabbitMQ broker running locally.Then, run tests like this:
```bash
$ make test
```
## Contributing
Please see [CONTRIBUTING](CONTRIBUTING.md) for details.
## Using AMQP 0.8 ##
If you still want to use the old version of the protocol then you can do it by setting the following constant in your configuration code:
```php
define('AMQP_PROTOCOL', '0.8');
```
The default value is `'0.9.1'`.
## Providing your own autoloader ##
If for some reason you don't want to use composer, then you need to have an autoloader in place fo the library classes. People have [reported](https://github.com/videlalvaro/php-amqplib/issues/61#issuecomment-37855050) to use this [autoloader](https://gist.github.com/jwage/221634) with success.
## Original README: ##
Below is the original README file content. Credits goes to the original authors.
PHP library implementing Advanced Message Queuing Protocol (AMQP).
The library is port of python code of py-amqplib
http://barryp.org/software/py-amqplib/
It have been tested with RabbitMQ server.
Project home page: http://code.google.com/p/php-amqplib/
For discussion, please join the group:
http://groups.google.com/group/php-amqplib-devel
For bug reports, please use bug tracking system at the project page.
Patches are very welcome!
Author: Vadim Zaliva <lord@crocodile.org>
[ico-version]: https://img.shields.io/packagist/v/php-amqplib/php-amqplib.svg?style=flat-square
[ico-license]: https://img.shields.io/badge/license-LGPL_2.1-brightgreen.svg?style=flat-square
[ico-scrutinizer]: https://img.shields.io/scrutinizer/coverage/g/php-amqplib/php-amqplib.svg?style=flat-square
[ico-code-quality]: https://img.shields.io/scrutinizer/g/php-amqplib/php-amqplib.svg?style=flat-square
[ico-downloads]: https://img.shields.io/packagist/dt/php-amqplib/php-amqplib.svg?style=flat-square
[link-packagist]: https://packagist.org/packages/php-amqplib/php-amqplib
[link-scrutinizer]: https://scrutinizer-ci.com/g/php-amqplib/php-amqplib/code-structure
[link-code-quality]: https://scrutinizer-ci.com/g/php-amqplib/php-amqplib
[link-downloads]: https://packagist.org/packages/php-amqplib/php-amqplib
[link-author]: https://github.com/php-amqplib
[link-contributors]: ../../contributors
coverage:
status:
project:
default:
target: auto
threshold: 0.5
{
"name": "php-amqplib/php-amqplib",
"replace": {
"videlalvaro/php-amqplib": "self.version"
},
"type": "library",
"description": "Formerly videlalvaro/php-amqplib. This library is a pure PHP implementation of the AMQP protocol. It's been tested against RabbitMQ.",
"keywords": ["rabbitmq", "message", "queue"],
"homepage": "https://github.com/php-amqplib/php-amqplib/",
"authors": [
{
"name": "Alvaro Videla",
"role": "Original Maintainer"
},
{
"name": "Raúl Araya",
"email": "nubeiro@gmail.com",
"role": "Maintainer"
},
{
"name": "Luke Bakken",
"email": "luke@bakken.io",
"role": "Maintainer"
},
{
"name": "Ramūnas Dronga",
"email": "github@ramuno.lt",
"role": "Maintainer"
}
],
"require": {
"php": "^7.1||^8.0",
"ext-sockets": "*",
"ext-mbstring": "*",
"phpseclib/phpseclib": "^2.0|^3.0"
},
"require-dev": {
"ext-curl": "*",
"phpunit/phpunit": "^7.5|^9.5",
"squizlabs/php_codesniffer": "^3.6",
"nategood/httpful": "^0.2.20"
},
"conflict": {
"php": "7.4.0 - 7.4.1"
},
"autoload": {
"psr-4": {
"PhpAmqpLib\\": "PhpAmqpLib/"
}
},
"autoload-dev": {
"psr-4": {
"PhpAmqpLib\\Tests\\Functional\\": "tests/Functional",
"PhpAmqpLib\\Tests\\Unit\\": "tests/Unit"
}
},
"license": "LGPL-2.1-or-later",
"extra": {
"branch-alias": {
"dev-master": "3.0-dev"
}
}
}
<?xml version="1.0" encoding="UTF-8"?>
<ruleset xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="vendor/squizlabs/php_codesniffer/phpcs.xsd">
<arg name="basepath" value="."/>
<arg name="cache" value=".phpcs-cache"/>
<arg name="colors"/>
<arg value="p"/>
<arg name="extensions" value="php"/>
<arg name="tab-width" value="4"/>
<arg name="report-width" value="120"/>
<rule ref="PEAR.Functions.ValidDefaultValue">
<!-- We should preserve BC with possible child classes until next major version -->
<exclude-pattern>PhpAmqpLib/Helper/Protocol/Protocol091.php</exclude-pattern>
<exclude-pattern>PhpAmqpLib/Helper/Protocol/Protocol080.php</exclude-pattern>
<exclude-pattern>PhpAmqpLib/Connection/AbstractConnection.php</exclude-pattern>
</rule>
<rule ref="PSR1.Methods.CamelCapsMethodName.NotCamelCaps">
<!-- We should preserve BC with possible child classes
and public method usage until next major version -->
<exclude-pattern>PhpAmqpLib/Wire/AMQPReader.php</exclude-pattern>
<exclude-pattern>PhpAmqpLib/Wire/IO/SocketIO.php</exclude-pattern>
<exclude-pattern>PhpAmqpLib/Wire/IO/StreamIO.php</exclude-pattern>
<exclude-pattern>PhpAmqpLib/Wire/IO/AbstractIO.php</exclude-pattern>
<exclude-pattern>PhpAmqpLib/Wire/AMQPWriter.php</exclude-pattern>
<exclude-pattern>PhpAmqpLib/Connection/AMQPSocketConnection.php</exclude-pattern>
<exclude-pattern>PhpAmqpLib/Connection/AMQPSSLConnection.php</exclude-pattern>
<exclude-pattern>PhpAmqpLib/Connection/AMQPStreamConnection.php</exclude-pattern>
<exclude-pattern>PhpAmqpLib/Connection/AMQPLazyConnection.php</exclude-pattern>
<exclude-pattern>PhpAmqpLib/Connection/AMQPLazySocketConnection.php</exclude-pattern>
<exclude-pattern>PhpAmqpLib/Connection/AMQPLazySSLConnection.php</exclude-pattern>
<exclude-pattern>PhpAmqpLib/Connection/AbstractConnection.php</exclude-pattern>
<exclude-pattern>PhpAmqpLib/Message/AMQPMessage.php</exclude-pattern>
<exclude-pattern>PhpAmqpLib/Helper/Protocol/Wait091.php</exclude-pattern>
<exclude-pattern>PhpAmqpLib/Helper/Protocol/MethodMap091.php</exclude-pattern>
<exclude-pattern>PhpAmqpLib/Helper/Protocol/Wait080.php</exclude-pattern>
<exclude-pattern>PhpAmqpLib/Helper/Protocol/MethodMap080.php</exclude-pattern>
<exclude-pattern>PhpAmqpLib/Helper/DebugHelper.php</exclude-pattern>
<exclude-pattern>PhpAmqpLib/Helper/MiscHelper.php</exclude-pattern>
<exclude-pattern>PhpAmqpLib/Channel/AMQPChannel.php</exclude-pattern>
<exclude-pattern>PhpAmqpLib/Channel/AbstractChannel.php</exclude-pattern>
</rule>
<file>PhpAmqpLib/</file>
</ruleset>
phpseclib Lead Developer: TerraFrost (Jim Wigginton)
phpseclib Developers: monnerat (Patrick Monnerat)
bantu (Andreas Fischer)
petrich (Hans-Jürgen Petrich)
GrahamCampbell (Graham Campbell)
hc-jworman
\ No newline at end of file
# Backers
phpseclib ongoing development is made possible by [Tidelift](https://tidelift.com/subscription/pkg/packagist-phpseclib-phpseclib?utm_source=packagist-phpseclib-phpseclib&utm_medium=referral&utm_campaign=readme) and by contributions by users like you. Thank you.
## Backers
- Allan Simon
- [ChargeOver](https://chargeover.com/)
- Raghu Veer Dendukuri
- Zane Hooper
- [Setasign](https://www.setasign.com/)
- [Charles Severance](https://github.com/csev)
- [Rachel Fish](https://github.com/itsrachelfish)
- Tharyrok
\ No newline at end of file