Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • Saransaran/php-class-project
  • sibidharan/php-class-project
  • Madhan1024/php-class-project
  • GopiKrishnan/photogram
  • Mhd_khalid/php-class-project
  • At_muthu__/php-class-project
  • jaganbhaskar155/php-class-project
  • hariharanrd/php-class-project
  • jasper715/php-class-project
  • hanuRakesh/photogram-project-main
  • Yuvaraj21/photogram
  • ram_rogers/php-class-project
  • Hihelloboy/php-class-project
  • Nadarajan/php-class-project
  • srisanthosh156/php-class-project
  • Buvaneshwaran.k/php-class-project
  • umarfarooq07/php-class-project
  • Dhanaprakash/php-class-project
  • jashwanth142003/php-class-project
  • Esakkiraja/php-class-project
  • Boomi/php-class-project
  • Kishore2071/php-class-project
  • Ram123raj/php-class-project
  • aswinkumar27/php-class-project
  • dhilipdhilip9655/php-class-project
  • Manikandam143/php-class-project
  • VikramS/php-class-project
  • ArnoldSam/php-class-project
  • gowthamapandi0008/php-class-project
  • d.barath7639/php-class-project
  • shyalandran/php-class-project
  • kiruba_432/php-class-project
  • razakias001/php-class-project
  • kannan.b2745/php-class-project
  • sathish236tsk/php-class-project
  • rii/php-class-project
  • jonathajh4k/php-class-project
  • Neelagandan_G/php-class-project
  • Tholkappiar2003/php-class-project
  • kamaleshselvam75/php-class-project
  • devapriyan/php-class-project
  • sanojahamed/php-class-project
  • rizwankendo/php-class-project
  • senthamilselvan18000/php-class-project
  • rajeshd01/php-class-project
  • Florence/php-class-project
  • vishnu191299/php-class-project
  • Rakeshrakki/php-class-project
  • sanjay057/php-class-project
  • amarsanthoshsanthosh/photogram-project-cp
  • md_ashmar/php-class-project
  • k.nandhishwaran777k/php-class-project
52 results
Show changes
Showing
with 4021 additions and 0 deletions
<?php
/* This file was autogenerated by spec/parser.php - Do not modify */
namespace PhpAmqpLib\Helper\Protocol;
class Wait080
{
/**
* @var array
*/
protected $wait = array(
'connection.start' => '10,10',
'connection.start_ok' => '10,11',
'connection.secure' => '10,20',
'connection.secure_ok' => '10,21',
'connection.tune' => '10,30',
'connection.tune_ok' => '10,31',
'connection.open' => '10,40',
'connection.open_ok' => '10,41',
'connection.redirect' => '10,50',
'connection.close' => '10,60',
'connection.close_ok' => '10,61',
'channel.open' => '20,10',
'channel.open_ok' => '20,11',
'channel.flow' => '20,20',
'channel.flow_ok' => '20,21',
'channel.alert' => '20,30',
'channel.close' => '20,40',
'channel.close_ok' => '20,41',
'access.request' => '30,10',
'access.request_ok' => '30,11',
'exchange.declare' => '40,10',
'exchange.declare_ok' => '40,11',
'exchange.delete' => '40,20',
'exchange.delete_ok' => '40,21',
'queue.declare' => '50,10',
'queue.declare_ok' => '50,11',
'queue.bind' => '50,20',
'queue.bind_ok' => '50,21',
'queue.purge' => '50,30',
'queue.purge_ok' => '50,31',
'queue.delete' => '50,40',
'queue.delete_ok' => '50,41',
'queue.unbind' => '50,50',
'queue.unbind_ok' => '50,51',
'basic.qos' => '60,10',
'basic.qos_ok' => '60,11',
'basic.consume' => '60,20',
'basic.consume_ok' => '60,21',
'basic.cancel' => '60,30',
'basic.cancel_ok' => '60,31',
'basic.publish' => '60,40',
'basic.return' => '60,50',
'basic.deliver' => '60,60',
'basic.get' => '60,70',
'basic.get_ok' => '60,71',
'basic.get_empty' => '60,72',
'basic.ack' => '60,80',
'basic.reject' => '60,90',
'basic.recover_async' => '60,100',
'basic.recover' => '60,110',
'basic.recover_ok' => '60,111',
'file.qos' => '70,10',
'file.qos_ok' => '70,11',
'file.consume' => '70,20',
'file.consume_ok' => '70,21',
'file.cancel' => '70,30',
'file.cancel_ok' => '70,31',
'file.open' => '70,40',
'file.open_ok' => '70,41',
'file.stage' => '70,50',
'file.publish' => '70,60',
'file.return' => '70,70',
'file.deliver' => '70,80',
'file.ack' => '70,90',
'file.reject' => '70,100',
'stream.qos' => '80,10',
'stream.qos_ok' => '80,11',
'stream.consume' => '80,20',
'stream.consume_ok' => '80,21',
'stream.cancel' => '80,30',
'stream.cancel_ok' => '80,31',
'stream.publish' => '80,40',
'stream.return' => '80,50',
'stream.deliver' => '80,60',
'tx.select' => '90,10',
'tx.select_ok' => '90,11',
'tx.commit' => '90,20',
'tx.commit_ok' => '90,21',
'tx.rollback' => '90,30',
'tx.rollback_ok' => '90,31',
'dtx.select' => '100,10',
'dtx.select_ok' => '100,11',
'dtx.start' => '100,20',
'dtx.start_ok' => '100,21',
'tunnel.request' => '110,10',
'test.integer' => '120,10',
'test.integer_ok' => '120,11',
'test.string' => '120,20',
'test.string_ok' => '120,21',
'test.table' => '120,30',
'test.table_ok' => '120,31',
'test.content' => '120,40',
'test.content_ok' => '120,41',
);
/**
* @var string $method
* @return string
*/
public function get_wait($method)
{
return $this->wait[$method];
}
}
<?php
/* This file was autogenerated by spec/parser.php - Do not modify */
namespace PhpAmqpLib\Helper\Protocol;
class Wait091
{
/**
* @var array
*/
protected $wait = array(
'connection.start' => '10,10',
'connection.start_ok' => '10,11',
'connection.secure' => '10,20',
'connection.secure_ok' => '10,21',
'connection.tune' => '10,30',
'connection.tune_ok' => '10,31',
'connection.open' => '10,40',
'connection.open_ok' => '10,41',
'connection.close' => '10,50',
'connection.close_ok' => '10,51',
'connection.blocked' => '10,60',
'connection.unblocked' => '10,61',
'channel.open' => '20,10',
'channel.open_ok' => '20,11',
'channel.flow' => '20,20',
'channel.flow_ok' => '20,21',
'channel.close' => '20,40',
'channel.close_ok' => '20,41',
'access.request' => '30,10',
'access.request_ok' => '30,11',
'exchange.declare' => '40,10',
'exchange.declare_ok' => '40,11',
'exchange.delete' => '40,20',
'exchange.delete_ok' => '40,21',
'exchange.bind' => '40,30',
'exchange.bind_ok' => '40,31',
'exchange.unbind' => '40,40',
'exchange.unbind_ok' => '40,51',
'queue.declare' => '50,10',
'queue.declare_ok' => '50,11',
'queue.bind' => '50,20',
'queue.bind_ok' => '50,21',
'queue.purge' => '50,30',
'queue.purge_ok' => '50,31',
'queue.delete' => '50,40',
'queue.delete_ok' => '50,41',
'queue.unbind' => '50,50',
'queue.unbind_ok' => '50,51',
'basic.qos' => '60,10',
'basic.qos_ok' => '60,11',
'basic.consume' => '60,20',
'basic.consume_ok' => '60,21',
'basic.cancel' => '60,30',
'basic.cancel_ok' => '60,31',
'basic.publish' => '60,40',
'basic.return' => '60,50',
'basic.deliver' => '60,60',
'basic.get' => '60,70',
'basic.get_ok' => '60,71',
'basic.get_empty' => '60,72',
'basic.ack' => '60,80',
'basic.reject' => '60,90',
'basic.recover_async' => '60,100',
'basic.recover' => '60,110',
'basic.recover_ok' => '60,111',
'basic.nack' => '60,120',
'tx.select' => '90,10',
'tx.select_ok' => '90,11',
'tx.commit' => '90,20',
'tx.commit_ok' => '90,21',
'tx.rollback' => '90,30',
'tx.rollback_ok' => '90,31',
'confirm.select' => '85,10',
'confirm.select_ok' => '85,11',
);
/**
* @var string $method
* @return string
*/
public function get_wait($method)
{
return $this->wait[$method];
}
}
<?php
namespace PhpAmqpLib\Helper;
/**
* @property-read int $SOCKET_EPIPE
* @property-read int $SOCKET_ENETDOWN
* @property-read int $SOCKET_ENETUNREACH
* @property-read int $SOCKET_ENETRESET
* @property-read int $SOCKET_ECONNABORTED
* @property-read int $SOCKET_ECONNRESET
* @property-read int $SOCKET_ECONNREFUSED
* @property-read int $SOCKET_ETIMEDOUT
* @property-read int $SOCKET_EWOULDBLOCK
* @property-read int $SOCKET_EINTR
* @property-read int $SOCKET_EAGAIN
*/
final class SocketConstants
{
/**
* @var int[]
*/
private $constants;
/** @var self */
private static $instance;
public function __construct()
{
$constants = get_defined_constants(true);
if (isset($constants['sockets'])) {
$this->constants = $constants['sockets'];
} else {
trigger_error('Sockets extension is not enabled', E_USER_WARNING);
$this->constants = array();
}
}
/**
* @param string $name
* @return int
*/
public function __get($name)
{
return isset($this->constants[$name]) ? $this->constants[$name] : 0;
}
/**
* @param string $name
* @param int $value
* @internal
*/
public function __set($name, $value)
{
}
/**
* @param string $name
* @return bool
*/
public function __isset($name)
{
return isset($this->constants[$name]);
}
/**
* @return self
*/
public static function getInstance()
{
if (!self::$instance) {
self::$instance = new self();
}
return self::$instance;
}
}
<?php
namespace PhpAmqpLib\Message;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Exception\AMQPEmptyDeliveryTagException;
use PhpAmqpLib\Wire\AMQPReader;
use PhpAmqpLib\Wire\AMQPWriter;
/**
* A Message for use with the Channnel.basic_* methods.
*/
class AMQPMessage
{
const DELIVERY_MODE_NON_PERSISTENT = 1;
const DELIVERY_MODE_PERSISTENT = 2;
/** @var string */
public $body;
/** @var int */
public $body_size;
/** @var bool */
public $is_truncated = false;
/** @var string */
public $content_encoding;
/** @var int */
private $deliveryTag;
/** @var string|null */
private $consumerTag;
/** @var bool|null */
private $redelivered;
/** @var string|null */
private $exchange;
/** @var string|null */
private $routingKey;
/** @var int|null */
private $messageCount;
/** @var AMQPChannel|null */
private $channel;
/** @var bool */
private $responded = false;
/**
* @var array
* @internal
* @deprecated
*/
public $delivery_info = array();
/** @var array Properties content */
protected $properties = array();
/** @var null|string Compiled properties */
protected $serialized_properties;
/** @var array */
protected static $propertyDefinitions = array(
'content_type' => 'shortstr',
'content_encoding' => 'shortstr',
'application_headers' => 'table_object',
'delivery_mode' => 'octet',
'priority' => 'octet',
'correlation_id' => 'shortstr',
'reply_to' => 'shortstr',
'expiration' => 'shortstr',
'message_id' => 'shortstr',
'timestamp' => 'timestamp',
'type' => 'shortstr',
'user_id' => 'shortstr',
'app_id' => 'shortstr',
'cluster_id' => 'shortstr',
);
/**
* @param string $body
* @param array $properties
*/
public function __construct($body = '', $properties = array())
{
$this->setBody($body);
if (!empty($properties) && is_array($properties)) {
$this->properties = array_intersect_key($properties, self::$propertyDefinitions);
}
}
/**
* Acknowledge one or more messages.
*
* @param bool $multiple If true, the delivery tag is treated as "up to and including",
* so that multiple messages can be acknowledged with a single method.
* @since 2.12.0
* @link https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.ack
*/
public function ack($multiple = false)
{
$this->assertUnacked();
$this->channel->basic_ack($this->deliveryTag, $multiple);
$this->onResponse();
}
/**
* Reject one or more incoming messages.
*
* @param bool $requeue If true, the server will attempt to requeue the message. If requeue is false or the requeue
* attempt fails the messages are discarded or dead-lettered.
* @param bool $multiple If true, the delivery tag is treated as "up to and including",
* so that multiple messages can be rejected with a single method.
* @since 2.12.0
* @link https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.nack
*/
public function nack($requeue = false, $multiple = false)
{
$this->assertUnacked();
$this->channel->basic_nack($this->deliveryTag, $multiple, $requeue);
$this->onResponse();
}
/**
* Reject an incoming message.
*
* @param bool $requeue If requeue is true, the server will attempt to requeue the message.
* If requeue is false or the requeue attempt fails the messages are discarded or dead-lettered.
* @since 2.12.0
* @link https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.reject
*/
public function reject($requeue = true)
{
$this->assertUnacked();
$this->channel->basic_reject($this->deliveryTag, $requeue);
$this->onResponse();
}
/**
* @throws \LogicException When response to broker was already sent.
*/
protected function assertUnacked()
{
if (!$this->channel || $this->responded) {
throw new \LogicException('Message is not published or response was already sent');
}
}
protected function onResponse()
{
$this->responded = true;
}
/**
* @return AMQPChannel|null
* @since 2.12.0
*/
public function getChannel()
{
return $this->channel;
}
/**
* @param AMQPChannel $channel
* @return $this
* @throws \RuntimeException
* @since 2.12.0
*/
public function setChannel($channel)
{
if ($this->channel) {
throw new \RuntimeException('A message is already assigned to channel');
}
$this->channel = $channel;
$this->delivery_info['channel'] = $channel;
return $this;
}
/**
* @param int $deliveryTag
* @param bool $redelivered
* @param string $exchange
* @param string $routingKey
* @return $this
* @since 2.12.0
*/
public function setDeliveryInfo($deliveryTag, $redelivered, $exchange, $routingKey)
{
$this->deliveryTag = $this->delivery_info['delivery_tag'] = $deliveryTag;
$this->redelivered = $this->delivery_info['redelivered'] = $redelivered;
$this->exchange = $this->delivery_info['exchange'] = $exchange;
$this->routingKey = $this->delivery_info['routing_key'] = $routingKey;
return $this;
}
/**
* @return bool|null
* @since 2.12.0
*/
public function isRedelivered()
{
return $this->redelivered;
}
/**
* @return string|null
* @since 2.12.0
*/
public function getExchange()
{
return $this->exchange;
}
/**
* @return string|null
* @since 2.12.0
*/
public function getRoutingKey()
{
return $this->routingKey;
}
/**
* @return string|null
* @since 2.12.0
*/
public function getConsumerTag()
{
return $this->consumerTag;
}
/**
* @param string $consumerTag
* @return $this
* @since 2.12.0
*/
public function setConsumerTag($consumerTag)
{
$this->consumerTag = $consumerTag;
$this->delivery_info['consumer_tag'] = $consumerTag;
return $this;
}
/**
* @return int|null
* @since 2.12.0
*/
public function getMessageCount()
{
return $this->messageCount;
}
/**
* @param int $messageCount
* @return $this
* @since 2.12.0
*/
public function setMessageCount($messageCount)
{
$this->messageCount = (int)$messageCount;
$this->delivery_info['message_count'] = $this->messageCount;
return $this;
}
/**
* @return string
*/
public function getBody()
{
return $this->body;
}
/**
* Sets the message payload
*
* @param string $body
* @return $this
*/
public function setBody($body)
{
$this->body = $body;
return $this;
}
/**
* @return string
*/
public function getContentEncoding()
{
return $this->content_encoding;
}
/**
* @return int
*/
public function getBodySize()
{
return $this->body_size;
}
/**
* @param int $body_size Message body size in byte(s)
* @return AMQPMessage
*/
public function setBodySize($body_size)
{
$this->body_size = (int)$body_size;
return $this;
}
/**
* @return boolean
*/
public function isTruncated()
{
return $this->is_truncated;
}
/**
* @param bool $is_truncated
* @return AMQPMessage
*/
public function setIsTruncated($is_truncated)
{
$this->is_truncated = (bool)$is_truncated;
return $this;
}
/**
* @param int|string $deliveryTag
* @return $this
* @since 2.12.0
*/
public function setDeliveryTag($deliveryTag)
{
if (!empty($this->deliveryTag)) {
throw new \LogicException('Delivery tag cannot be changed');
}
$this->deliveryTag = $deliveryTag;
$this->delivery_info['delivery_tag'] = $deliveryTag;
return $this;
}
/**
* @return int
*
* @throws AMQPEmptyDeliveryTagException
*/
public function getDeliveryTag()
{
if (empty($this->deliveryTag)) {
throw new AMQPEmptyDeliveryTagException('This message was not delivered yet');
}
return $this->deliveryTag;
}
/**
* Check whether a property exists in the 'properties' dictionary
* or if present - in the 'delivery_info' dictionary.
*
* @param string $name
* @return bool
*/
public function has($name)
{
return isset($this->properties[$name]) || isset($this->delivery_info[$name]);
}
/**
* Look for additional properties in the 'properties' dictionary,
* and if present - the 'delivery_info' dictionary.
*
* @param string $name
* @return mixed|AMQPChannel
* @throws \OutOfBoundsException
*/
public function get($name)
{
if (isset($this->properties[$name])) {
return $this->properties[$name];
}
if (isset($this->delivery_info[$name])) {
return $this->delivery_info[$name];
}
throw new \OutOfBoundsException(sprintf(
'No "%s" property',
$name
));
}
/**
* Returns the properties content
*
* @return array
*/
public function get_properties()
{
return $this->properties;
}
/**
* Sets a property value
*
* @param string $name The property name (one of the property definition)
* @param mixed $value The property value
* @throws \OutOfBoundsException
*/
public function set($name, $value)
{
if (!array_key_exists($name, self::$propertyDefinitions)) {
throw new \OutOfBoundsException(sprintf(
'No "%s" property',
$name
));
}
if (isset($this->properties[$name]) && $this->properties[$name] === $value) {
// same value, nothing to do
return;
}
$this->properties[$name] = $value;
$this->serialized_properties = null;
}
/**
* Given the raw bytes containing the property-flags and
* property-list from a content-frame-header, parse and insert
* into a dictionary stored in this object as an attribute named
* 'properties'.
*
* @param AMQPReader $reader
* NOTE: do not mutate $reader
* @return $this
*/
public function load_properties(AMQPReader $reader)
{
// Read 16-bit shorts until we get one with a low bit set to zero
$flags = array();
while (true) {
$flag_bits = $reader->read_short();
$flags[] = $flag_bits;
if (($flag_bits & 1) === 0) {
break;
}
}
$shift = 0;
$data = array();
foreach (self::$propertyDefinitions as $key => $proptype) {
if ($shift === 0) {
if (!$flags) {
break;
}
$flag_bits = array_shift($flags);
$shift = 15;
}
if ($flag_bits & (1 << $shift)) {
$data[$key] = $reader->{'read_' . $proptype}();
}
$shift -= 1;
}
$this->properties = $data;
return $this;
}
/**
* Serializes the 'properties' attribute (a dictionary) into the
* raw bytes making up a set of property flags and a property
* list, suitable for putting into a content frame header.
*
* @return string
* @todo Inject the AMQPWriter to make the method easier to test
*/
public function serialize_properties()
{
if (!empty($this->serialized_properties)) {
return $this->serialized_properties;
}
$shift = 15;
$flag_bits = 0;
$flags = array();
$raw_bytes = new AMQPWriter();
foreach (self::$propertyDefinitions as $key => $prototype) {
$val = isset($this->properties[$key]) ? $this->properties[$key] : null;
// Very important: PHP type eval is weak, use the === to test the
// value content. Zero or false value should not be removed
if ($val === null) {
$shift -= 1;
continue;
}
if ($shift === 0) {
$flags[] = $flag_bits;
$flag_bits = 0;
$shift = 15;
}
$flag_bits |= (1 << $shift);
if ($prototype !== 'bit') {
$raw_bytes->{'write_' . $prototype}($val);
}
$shift -= 1;
}
$flags[] = $flag_bits;
$result = new AMQPWriter();
foreach ($flags as $flag_bits) {
$result->write_short($flag_bits);
}
$result->write($raw_bytes->getvalue());
$this->serialized_properties = $result->getvalue();
return $this->serialized_properties;
}
}
<?php
namespace PhpAmqpLib;
final class Package
{
public const NAME = 'AMQPLib';
public const VERSION = '3.5.1';
}
<?php
namespace PhpAmqpLib\Wire;
use PhpAmqpLib\Channel\AbstractChannel;
use PhpAmqpLib\Exception;
use PhpAmqpLib\Wire;
/**
* Iterator implemented for transparent integration with AMQPWriter::write_[array|table]()
*/
abstract class AMQPAbstractCollection implements \Iterator, \ArrayAccess
{
//protocol defines available field types and their corresponding symbols
const PROTOCOL_RBT = 'rabbit'; //pseudo proto
//Abstract data types
const T_INT_SHORTSHORT = 1;
const T_INT_SHORTSHORT_U = 2;
const T_INT_SHORT = 3;
const T_INT_SHORT_U = 4;
const T_INT_LONG = 5;
const T_INT_LONG_U = 6;
const T_INT_LONGLONG = 7;
const T_INT_LONGLONG_U = 8;
const T_DECIMAL = 9;
const T_TIMESTAMP = 10;
const T_VOID = 11;
const T_BOOL = 12;
const T_STRING_SHORT = 13;
const T_STRING_LONG = 14;
const T_ARRAY = 15;
const T_TABLE = 16;
const T_BYTES = 17;
const T_FLOAT = 18;
const T_DOUBLE = 19;
/**
* @var string
*/
private static $protocol;
/*
* Field types messy mess http://www.rabbitmq.com/amqp-0-9-1-errata.html#section_3
* Default behaviour is to use rabbitMQ compatible field-set
* Define AMQP_STRICT_FLD_TYPES=true to use strict AMQP instead
* @var array<int, string>
*/
private static $types_080 = array(
self::T_INT_LONG => 'I',
self::T_DECIMAL => 'D',
self::T_TIMESTAMP => 'T',
self::T_STRING_LONG => 'S',
self::T_TABLE => 'F'
);
/**
* @var array<int, string>
*/
private static $types_091 = array(
self::T_INT_SHORTSHORT => 'b',
self::T_INT_SHORTSHORT_U => 'B',
self::T_INT_SHORT => 'U',
self::T_INT_SHORT_U => 'u',
self::T_INT_LONG => 'I',
self::T_INT_LONG_U => 'i',
self::T_INT_LONGLONG => 'L',
self::T_INT_LONGLONG_U => 'l',
self::T_FLOAT => 'f',
self::T_DOUBLE => 'd',
self::T_DECIMAL => 'D',
self::T_TIMESTAMP => 'T',
self::T_VOID => 'V',
self::T_BOOL => 't',
self::T_STRING_SHORT => 's',
self::T_STRING_LONG => 'S',
self::T_ARRAY => 'A',
self::T_TABLE => 'F',
self::T_BYTES => 'x',
);
/**
* @var array<int, string>
*/
private static $types_rabbit = array(
self::T_INT_SHORTSHORT => 'b',
self::T_INT_SHORTSHORT_U => 'B',
self::T_INT_SHORT => 's',
self::T_INT_SHORT_U => 'u',
self::T_INT_LONG => 'I',
self::T_INT_LONG_U => 'i',
self::T_INT_LONGLONG => 'l',
self::T_FLOAT => 'f',
self::T_DOUBLE => 'd',
self::T_DECIMAL => 'D',
self::T_TIMESTAMP => 'T',
self::T_VOID => 'V',
self::T_BOOL => 't',
self::T_STRING_LONG => 'S',
self::T_ARRAY => 'A',
self::T_TABLE => 'F',
self::T_BYTES => 'x',
);
/**
* @var array
*/
protected $data = array();
public function __construct(array $data = null)
{
if (!empty($data)) {
$this->data = $this->encodeCollection($data);
}
}
/**
* @return int
*/
abstract public function getType();
/**
* @param mixed $val
* @param int|null $type
* @param string $key
*/
final protected function setValue($val, $type = null, $key = null)
{
if ($val instanceof self) {
if ($type && ($type !== $val->getType())) {
throw new Exception\AMQPInvalidArgumentException(
sprintf(
'Attempted to add instance of %s representing type [%s] as mismatching type [%s]',
get_class($val),
$val->getType(),
$type
)
);
}
$type = $val->getType();
} elseif ($type) { //ensuring data integrity and that all members are properly validated
switch ($type) {
case self::T_ARRAY:
throw new Exception\AMQPInvalidArgumentException('Arrays must be passed as AMQPArray instance');
case self::T_TABLE:
throw new Exception\AMQPInvalidArgumentException('Tables must be passed as AMQPTable instance');
case self::T_DECIMAL:
if (!($val instanceof AMQPDecimal)) {
throw new Exception\AMQPInvalidArgumentException(
'Decimal values must be instance of AMQPDecimal'
);
}
break;
}
}
if ($type) {
self::checkDataTypeIsSupported($type, false);
$val = array($type, $val);
} else {
$val = $this->encodeValue($val);
}
if ($key === null) {
$this->data[] = $val;
} else {
$this->data[$key] = $val;
}
}
/**
* @return array
*/
final public function getNativeData()
{
return $this->decodeCollection($this->data);
}
/**
* @param array $val
* @return array
*/
final protected function encodeCollection(array $val)
{
foreach ($val as $k => $v) {
$val[$k] = $this->encodeValue($v);
}
return $val;
}
/**
* @param array $val
* @return array
*/
final protected function decodeCollection(array $val)
{
foreach ($val as $k => $v) {
$val[$k] = $this->decodeValue($v[1], $v[0]);
}
return $val;
}
public function offsetExists($offset): bool
{
return isset($this->data[$offset]);
}
/**
* @param mixed $offset
* @return mixed
*/
#[\ReturnTypeWillChange]
public function offsetGet($offset)
{
$value = isset($this->data[$offset]) ? $this->data[$offset] : null;
return is_array($value) ? $value[1] : $value;
}
public function offsetSet($offset, $value): void
{
$this->setValue($value, null, $offset);
}
public function offsetUnset($offset): void
{
unset($this->data[$offset]);
}
/**
* @param mixed $val
* @return mixed
* @throws Exception\AMQPOutOfBoundsException
*/
protected function encodeValue($val)
{
if (is_string($val)) {
$val = $this->encodeString($val);
} elseif (is_float($val)) {
$val = $this->encodeFloat($val);
} elseif (is_int($val)) {
$val = $this->encodeInt($val);
} elseif (is_bool($val)) {
$val = $this->encodeBool($val);
} elseif (is_null($val)) {
$val = $this->encodeVoid();
} elseif ($val instanceof \DateTimeInterface) {
$val = array(self::T_TIMESTAMP, $val->getTimestamp());
} elseif ($val instanceof AMQPDecimal) {
$val = array(self::T_DECIMAL, $val);
} elseif ($val instanceof self) {
//avoid silent type correction of strictly typed values
self::checkDataTypeIsSupported($val->getType(), false);
$val = array($val->getType(), $val);
} elseif (is_array($val)) {
//AMQP specs says "Field names MUST start with a letter, '$' or '#'"
//so beware, some servers may raise an exception with 503 code in cases when indexed
// array is encoded as table
if (self::isProtocol(Wire\Constants080::VERSION)) {
//080 doesn't support arrays, forcing table
$val = array(self::T_TABLE, new AMQPTable($val));
} elseif (empty($val) || (array_keys($val) === range(0, count($val) - 1))) {
$val = array(self::T_ARRAY, new AMQPArray($val));
} else {
$val = array(self::T_TABLE, new AMQPTable($val));
}
} else {
throw new Exception\AMQPOutOfBoundsException(
sprintf('Encountered value of unsupported type: %s', gettype($val))
);
}
return $val;
}
/**
* @param mixed $val
* @param int $type
* @return array|bool|\DateTime|null
*/
protected function decodeValue($val, $type)
{
if ($val instanceof self) {
//covering arrays and tables
$val = $val->getNativeData();
} else {
switch ($type) {
case self::T_BOOL:
$val = (bool) $val;
break;
case self::T_TIMESTAMP:
$val = \DateTime::createFromFormat('U', $val);
break;
case self::T_VOID:
$val = null;
break;
case self::T_ARRAY:
case self::T_TABLE:
throw new Exception\AMQPLogicException(
sprintf(
'%s %s',
'Encountered an array/table struct which is not an instance of AMQPCollection.',
'This is considered a bug and should be fixed, please report'
)
);
}
}
return $val;
}
/**
* @param string $val
* @return array
*/
protected function encodeString($val)
{
return array(self::T_STRING_LONG, $val);
}
/**
* @param int $val
* @return array
*/
protected function encodeInt($val)
{
if (($val >= -2147483648) && ($val <= 2147483647)) {
$ev = array(self::T_INT_LONG, $val);
} elseif (self::isProtocol(Wire\Constants080::VERSION)) {
//080 doesn't support longlong
$ev = $this->encodeString((string) $val);
} else {
$ev = array(self::T_INT_LONGLONG, $val);
}
return $ev;
}
/**
* @param float $val
* @return array
*/
protected function encodeFloat($val)
{
return $this->encodeString((string) $val);
}
/**
* @param bool $val
* @return array
*/
protected function encodeBool($val)
{
$val = (bool) $val;
return self::isProtocol(Wire\Constants080::VERSION)
? array(self::T_INT_LONG, (int) $val)
: array(self::T_BOOL, $val);
}
/**
* @return array
*/
protected function encodeVoid()
{
return self::isProtocol(Wire\Constants080::VERSION) ? $this->encodeString('') : array(self::T_VOID, null);
}
/**
* @return string
*/
final public static function getProtocol()
{
if (self::$protocol === null) {
self::$protocol = defined('AMQP_STRICT_FLD_TYPES') && AMQP_STRICT_FLD_TYPES ?
AbstractChannel::getProtocolVersion() :
self::PROTOCOL_RBT;
}
return self::$protocol;
}
/**
* @param string $proto
* @return bool
*/
final public static function isProtocol($proto)
{
return self::getProtocol() === $proto;
}
/**
* @return array [dataTypeConstant => dataTypeSymbol]
*/
final public static function getSupportedDataTypes()
{
switch ($proto = self::getProtocol()) {
case Wire\Constants080::VERSION:
$types = self::$types_080;
break;
case Wire\Constants091::VERSION:
$types = self::$types_091;
break;
case self::PROTOCOL_RBT:
$types = self::$types_rabbit;
break;
default:
throw new Exception\AMQPOutOfRangeException(sprintf('Unknown protocol: %s', $proto));
}
return $types;
}
/**
* @param string $type
* @param bool $return Whether to return or raise AMQPOutOfRangeException
* @return boolean
*/
final public static function checkDataTypeIsSupported($type, $return = true)
{
try {
$supported = self::getSupportedDataTypes();
if (!isset($supported[$type])) {
throw new Exception\AMQPOutOfRangeException(sprintf(
'AMQP-%s doesn\'t support data of type [%s]',
self::getProtocol(),
$type
));
}
return true;
} catch (Exception\AMQPOutOfRangeException $ex) {
if (!$return) {
throw $ex;
}
return false;
}
}
/**
* @param int $type
* @return string
*/
final public static function getSymbolForDataType($type)
{
$types = self::getSupportedDataTypes();
if (!isset($types[$type])) {
throw new Exception\AMQPOutOfRangeException(sprintf(
'AMQP-%s doesn\'t support data of type [%s]',
self::getProtocol(),
$type
));
}
return $types[$type];
}
/**
* @param string $symbol
* @return integer
*/
final public static function getDataTypeForSymbol($symbol)
{
$symbols = array_flip(self::getSupportedDataTypes());
if (!isset($symbols[$symbol])) {
throw new Exception\AMQPOutOfRangeException(sprintf(
'AMQP-%s doesn\'t define data of type [%s]',
self::getProtocol(),
$symbol
));
}
return $symbols[$symbol];
}
/**
* @return mixed
*/
#[\ReturnTypeWillChange]
public function current()
{
return current($this->data);
}
/**
* @return mixed
*/
#[\ReturnTypeWillChange]
public function key()
{
return key($this->data);
}
public function next(): void
{
next($this->data);
}
public function rewind(): void
{
reset($this->data);
}
public function valid(): bool
{
return key($this->data) !== null;
}
}
<?php
namespace PhpAmqpLib\Wire;
class AMQPArray extends AMQPAbstractCollection
{
/**
* @param array|null $data
*/
public function __construct(array $data = null)
{
parent::__construct(empty($data) ? null : array_values($data));
}
/**
* @return int
*/
final public function getType()
{
return self::T_ARRAY;
}
/**
* @param mixed $val
* @param int|null $type
* @return $this
*/
public function push($val, $type = null)
{
$this->setValue($val, $type);
return $this;
}
}
<?php
namespace PhpAmqpLib\Wire;
use PhpAmqpLib\Exception\AMQPDataReadException;
class AMQPBufferReader extends AMQPReader
{
/**
* @var string
*/
private $buffer;
/**
* @var int
*/
private $length;
public function __construct(string $buffer)
{
$this->buffer = $buffer;
$this->length = mb_strlen($buffer, 'ASCII');
}
public function close(): void
{
}
/**
* Resets the object from the injected param
*
* Used to not need to create a new AMQPBufferReader instance every time.
* when we can just pass a string and reset the object state.
* NOTE: since we are working with strings we don't need to pass an AbstractIO
* or a timeout.
*
* @param string $str
*/
public function reset(string $str): void
{
$this->buffer = $str;
$this->length = mb_strlen($this->buffer, 'ASCII');
$this->offset = 0;
$this->resetCounters();
}
protected function rawread(int $n): string
{
if ($this->length < $n) {
throw new AMQPDataReadException(sprintf(
'Error reading data. Requested %s bytes while string buffer has only %s',
$n,
$this->length
));
}
$res = mb_substr($this->buffer, 0, $n, 'ASCII');
$this->buffer = mb_substr($this->buffer, $n, null, 'ASCII');
$this->length -= $n;
$this->offset += $n;
return $res;
}
}
<?php
namespace PhpAmqpLib\Wire;
use PhpAmqpLib\Helper\BigInteger;
abstract class AMQPByteStream
{
public const BIT = 1;
public const OCTET = 1;
public const SHORTSTR = 1;
public const SHORT = 2;
public const LONG = 4;
public const SIGNED_LONG = 4;
public const READ_PHP_INT = 4; // use READ_ to avoid possible clashes with PHP
public const LONGLONG = 8;
public const TIMESTAMP = 8;
/** @var bool */
protected const PLATFORM_64BIT = PHP_INT_SIZE === 8;
/** @var BigInteger[][] */
protected static $bigIntegers = array();
/**
* @var bool
*/
protected static $isLittleEndian;
/**
* Converts byte-string between native and network byte order, in both directions
*
* @param string $bytes
* @return string
*/
protected function correctEndianness($bytes)
{
return self::isLittleEndian() ? $this->convertByteOrder($bytes) : $bytes;
}
/**
* @param string $bytes
* @return string
*/
protected function convertByteOrder($bytes)
{
return strrev($bytes);
}
/**
* @param int $longInt
* @return bool
*/
protected function getLongMSB($longInt)
{
return (bool) ($longInt & 0x80000000);
}
/**
* @param string $bytes
* @return bool
*/
protected function getMSB($bytes)
{
return ord($bytes[0]) > 127;
}
/**
* @return bool
*/
protected static function isLittleEndian()
{
if (self::$isLittleEndian === null) {
$tmp = unpack('S', "\x01\x00"); // to maintain 5.3 compatibility
self::$isLittleEndian = $tmp[1] === 1;
}
return self::$isLittleEndian;
}
/**
* @param string $value
* @param int $base
* @return BigInteger
*/
protected static function getBigInteger($value, $base = 10)
{
if (!isset(self::$bigIntegers[$base])) {
self::$bigIntegers[$base] = array();
}
if (isset(self::$bigIntegers[$base][$value])) {
return self::$bigIntegers[$base][$value];
}
$integer = new BigInteger($value, $base);
self::$bigIntegers[$base][$value] = $integer;
return $integer;
}
}
<?php
namespace PhpAmqpLib\Wire;
use PhpAmqpLib\Exception\AMQPOutOfBoundsException;
use PhpAmqpLib\Helper\BigInteger;
/**
* AMQP protocol decimal value.
*
* Values are represented as (n,e) pairs. The actual value
* is n * 10^(-e).
*
* From 0.8 spec: Decimal values are
* not intended to support floating point values, but rather
* business values such as currency rates and amounts. The
* 'decimals' octet is not signed.
*/
class AMQPDecimal
{
/** @var int */
protected $n;
/** @var int */
protected $e;
/**
* @param int $n
* @param int $e
* @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
*/
public function __construct($n, $e)
{
if ($e < 0) {
throw new AMQPOutOfBoundsException('Decimal exponent value must be unsigned!');
}
$this->n = $n;
$this->e = $e;
}
/**
* @return string
*/
public function asBCvalue()
{
$n = new BigInteger($this->n);
$e = new BigInteger('1' . str_repeat('0', $this->e));
list($q) = $n->divide($e);
return $q->toString();
}
/**
* @return int
*/
public function getE()
{
return $this->e;
}
/**
* @return int
*/
public function getN()
{
return $this->n;
}
}
<?php
namespace PhpAmqpLib\Wire;
use PhpAmqpLib\Exception\AMQPDataReadException;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Exception\AMQPNoDataException;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Helper\MiscHelper;
use PhpAmqpLib\Wire\IO\AbstractIO;
use RuntimeException;
class AMQPIOReader extends AMQPReader
{
/** @var AbstractIO */
private $io;
/** @var int|float|null */
protected $timeout;
public function __construct(AbstractIO $io, $timeout = 0)
{
$this->io = $io;
$this->timeout = $timeout;
}
public function close(): void
{
$this->io->close();
}
/**
* @return float|int|mixed|null
*/
public function getTimeout()
{
return $this->timeout;
}
/**
* Sets the timeout (second)
*
* @param int|float|null $timeout
*/
public function setTimeout($timeout)
{
$this->timeout = $timeout;
}
/**
* @param int $n
* @return string
* @throws RuntimeException
* @throws AMQPDataReadException|AMQPNoDataException|AMQPIOException
*/
protected function rawread(int $n): string
{
$res = '';
while (true) {
$this->wait();
try {
$res = $this->io->read($n);
break;
} catch (AMQPTimeoutException $e) {
if ($this->getTimeout() > 0) {
throw $e;
}
}
}
$this->offset += $n;
return $res;
}
/**
* Waits until some data is retrieved from the socket.
*
* AMQPTimeoutException can be raised if the timeout is set
*
* @throws \PhpAmqpLib\Exception\AMQPTimeoutException when timeout is set and no data received
* @throws \PhpAmqpLib\Exception\AMQPNoDataException when no data is ready to read from IO
*/
protected function wait()
{
$timeout = $this->timeout;
if (null === $timeout) {
// timeout=null just poll state and return instantly
$sec = 0;
$usec = 0;
} elseif ($timeout > 0) {
list($sec, $usec) = MiscHelper::splitSecondsMicroseconds($this->getTimeout());
} else {
// wait indefinitely for data if timeout=0
$sec = null;
$usec = 0;
}
$result = $this->io->select($sec, $usec);
if ($result === 0) {
if ($timeout > 0) {
throw new AMQPTimeoutException(sprintf(
'The connection timed out after %s sec while awaiting incoming data',
$timeout
));
} else {
throw new AMQPNoDataException('No data is ready to read');
}
}
}
}
<?php
namespace PhpAmqpLib\Wire;
use PhpAmqpLib\Channel\Frame;
use PhpAmqpLib\Exception\AMQPInvalidArgumentException;
use PhpAmqpLib\Exception\AMQPOutOfBoundsException;
use PhpAmqpLib\Helper\BigInteger;
abstract class AMQPReader extends AMQPByteStream
{
/** @var int */
protected $offset = 0;
/** @var int */
protected $bitcount = 0;
/** @var int */
protected $bits = 0;
/**
* Close the byte stream.
*/
abstract public function close(): void;
abstract protected function rawread(int $n): string;
/**
* @param int $n
* @return string
*/
public function read($n)
{
$this->resetCounters();
return $this->rawread($n);
}
public function read_bit(): bool
{
if (empty($this->bitcount)) {
$this->bits = ord($this->rawread(1));
$this->bitcount = 8;
}
$result = ($this->bits & 1) === 1;
$this->bits >>= 1;
$this->bitcount--;
return $result;
}
/**
* @return int
*/
public function read_octet()
{
$this->resetCounters();
list(, $res) = unpack('C', $this->rawread(1));
return $res;
}
/**
* @return int
*/
public function read_signed_octet()
{
$this->resetCounters();
list(, $res) = unpack('c', $this->rawread(1));
return $res;
}
/**
* @return int
*/
public function read_short()
{
$this->resetCounters();
list(, $res) = unpack('n', $this->rawread(2));
return $res;
}
/**
* @return int
*/
public function read_signed_short()
{
$this->resetCounters();
list(, $res) = unpack('s', $this->correctEndianness($this->rawread(2)));
return $res;
}
/**
* Reads 32 bit integer in big-endian byte order.
*
* On 64 bit systems it will return always unsigned int
* value in 0..2^32 range.
*
* On 32 bit systems it will return signed int value in
* -2^31...+2^31 range.
*
* Use with caution!
* @return int|string
*/
public function read_php_int()
{
list(, $res) = unpack('N', $this->rawread(4));
if (self::PLATFORM_64BIT) {
return (int) sprintf('%u', $res);
}
return $res;
}
/**
* PHP does not have unsigned 32 bit int,
* so we return it as a string
*
* @return int|string
*/
public function read_long()
{
$this->resetCounters();
list(, $res) = unpack('N', $this->rawread(4));
if (!self::PLATFORM_64BIT && $this->getLongMSB($res)) {
return sprintf('%u', $res);
}
return $res;
}
/**
* @return int
*/
private function readSignedLong()
{
$this->resetCounters();
list(, $res) = unpack('l', $this->correctEndianness($this->rawread(4)));
return $res;
}
/**
* Even on 64 bit systems PHP integers are signed.
* Since we need an unsigned value here we return it as a string.
*
* @return int|string
*/
public function read_longlong()
{
$this->resetCounters();
$bytes = $this->rawread(8);
if (self::PLATFORM_64BIT) {
// we can "unpack" if MSB bit is 0 (at most 63 bit integer), fallback to BigInteger otherwise
if (!$this->getMSB($bytes)) {
$res = unpack('J', $bytes);
return $res[1];
}
} else {
// on 32-bit systems we can "unpack" up to 31 bits integer
list(, $hi, $lo) = unpack('N2', $bytes);
if ($hi === 0 && $lo > 0) {
return $lo;
}
}
$var = new BigInteger($bytes, 256);
return $var->toString();
}
/**
* @return int|string
*/
public function read_signed_longlong()
{
$this->resetCounters();
$bytes = $this->rawread(8);
if (self::PLATFORM_64BIT) {
$res = unpack('q', $this->correctEndianness($bytes));
return $res[1];
} else {
// on 32-bit systems we can "unpack" up to 31 bits integer
list(, $hi, $lo) = unpack('N2', $bytes);
if ($hi === 0 && $lo > 0) {
// positive and less than 2^31-1
return $lo;
}
// negative and more than -2^31
if ($hi === -1 && $this->getLongMSB($lo)) {
return $lo;
}
}
$var = new BigInteger($bytes, -256);
return $var->toString();
}
/**
* @return float
*/
public function read_float()
{
$this->resetCounters();
list(, $res) = unpack('G', $this->rawread(4));
return (float)$res;
}
/**
* @return float
*/
public function read_double()
{
$this->resetCounters();
list(, $res) = unpack('E', $this->rawread(8));
return (float)$res;
}
/**
* Read a utf-8 encoded string that's stored in up to
* 255 bytes. Return it decoded as a PHP unicode object.
* @return string
*/
public function read_shortstr()
{
$this->resetCounters();
list(, $slen) = unpack('C', $this->rawread(1));
return $this->rawread($slen);
}
/**
* Read a string that's up to 2**32 bytes, the encoding
* isn't specified in the AMQP spec, so just return it as
* a plain PHP string.
* @return string
*/
public function read_longstr()
{
$this->resetCounters();
$slen = $this->read_php_int();
if ($slen < 0) {
throw new AMQPOutOfBoundsException('Strings longer than supported on this platform');
}
return $this->rawread($slen);
}
/**
* Read and AMQP timestamp, which is a 64-bit integer representing
* seconds since the Unix epoch in 1-second resolution.
* @return int|string
*/
public function read_timestamp()
{
return $this->read_longlong();
}
/**
* Read an AMQP table, and return as a PHP array. keys are strings,
* values are (type,value) tuples.
*
* @param bool $returnObject Whether to return AMQPArray instance instead of plain array
* @return array|AMQPTable
*/
public function read_table(bool $returnObject = false)
{
$this->resetCounters();
$tlen = $this->read_php_int();
if ($tlen < 0) {
throw new AMQPOutOfBoundsException('Table is longer than supported');
}
$table_data = new AMQPBufferReader($this->rawread($tlen));
$result = $returnObject ? new AMQPTable() : array();
while ($table_data->tell() < $tlen) {
$name = $table_data->read_shortstr();
$ftype = AMQPAbstractCollection::getDataTypeForSymbol($ftypeSym = $table_data->rawread(1));
$val = $table_data->read_value($ftype, $returnObject);
$returnObject ? $result->set($name, $val, $ftype) : $result[$name] = array($ftypeSym, $val);
}
return $result;
}
/**
* @return array|AMQPTable
*/
public function read_table_object()
{
return $this->read_table(true);
}
/**
* Reads the array in the next value.
*
* @param bool $returnObject Whether to return AMQPArray instance instead of plain array
* @return array|AMQPArray
*/
public function read_array($returnObject = false)
{
$this->resetCounters();
// Determine array length and its end position
$arrayLength = $this->read_php_int();
$endOffset = $this->offset + $arrayLength;
$result = $returnObject ? new AMQPArray() : array();
// Read values until we reach the end of the array
while ($this->offset < $endOffset) {
$fieldType = AMQPAbstractCollection::getDataTypeForSymbol($this->rawread(1));
$fieldValue = $this->read_value($fieldType, $returnObject);
$returnObject ? $result->push($fieldValue, $fieldType) : $result[] = $fieldValue;
}
return $result;
}
/**
* @return array|AMQPArray
*/
public function read_array_object()
{
return $this->read_array(true);
}
/**
* @return array{type:int, channel:int, size:int}
*/
public function readFrameHeader(): array
{
return unpack('Ctype/nchannel/Nsize', $this->rawread(Frame::FRAME_HEADER_SIZE));
}
/**
* Reads the next value as the provided field type.
*
* @param int $fieldType One of AMQPAbstractCollection::T_* constants
* @param bool $collectionsAsObjects Description
* @return mixed
* @throws \PhpAmqpLib\Exception\AMQPDataReadException
*/
public function read_value(int $fieldType, bool $collectionsAsObjects = false)
{
$this->resetCounters();
switch ($fieldType) {
case AMQPAbstractCollection::T_INT_SHORTSHORT:
//according to AMQP091 spec, 'b' is not bit, it is short-short-int, also valid for rabbit/qpid
//$val=$this->read_bit();
$val = $this->read_signed_octet();
break;
case AMQPAbstractCollection::T_INT_SHORTSHORT_U:
case AMQPAbstractCollection::T_BOOL:
$val = $this->read_octet();
break;
case AMQPAbstractCollection::T_INT_SHORT:
$val = $this->read_signed_short();
break;
case AMQPAbstractCollection::T_INT_SHORT_U:
$val = $this->read_short();
break;
case AMQPAbstractCollection::T_INT_LONG:
$val = $this->readSignedLong();
break;
case AMQPAbstractCollection::T_INT_LONG_U:
$val = $this->read_long();
break;
case AMQPAbstractCollection::T_INT_LONGLONG:
$val = $this->read_signed_longlong();
break;
case AMQPAbstractCollection::T_INT_LONGLONG_U:
$val = $this->read_longlong();
break;
case AMQPAbstractCollection::T_DECIMAL:
$e = $this->read_octet();
$n = $this->readSignedLong();
$val = new AMQPDecimal($n, $e);
break;
case AMQPAbstractCollection::T_TIMESTAMP:
$val = $this->read_timestamp();
break;
case AMQPAbstractCollection::T_STRING_SHORT:
$val = $this->read_shortstr();
break;
case AMQPAbstractCollection::T_STRING_LONG:
case AMQPAbstractCollection::T_BYTES:
$val = $this->read_longstr();
break;
case AMQPAbstractCollection::T_ARRAY:
$val = $this->read_array($collectionsAsObjects);
break;
case AMQPAbstractCollection::T_TABLE:
$val = $this->read_table($collectionsAsObjects);
break;
case AMQPAbstractCollection::T_VOID:
$val = null;
break;
case AMQPAbstractCollection::T_FLOAT:
$val = $this->read_float();
break;
case AMQPAbstractCollection::T_DOUBLE:
$val = $this->read_double();
break;
default:
throw new AMQPInvalidArgumentException(sprintf(
'Unsupported type "%s"',
$fieldType
));
}
return $val;
}
protected function tell(): int
{
return $this->offset;
}
protected function resetCounters(): void
{
$this->bitcount = $this->bits = 0;
}
}
<?php
namespace PhpAmqpLib\Wire;
use PhpAmqpLib\Exception;
class AMQPTable extends AMQPAbstractCollection
{
/**
* @return int
*/
final public function getType()
{
return self::T_TABLE;
}
/**
* @param string $key
* @param mixed $val
* @param int|null $type
*/
public function set($key, $val, $type = null)
{
//https://www.rabbitmq.com/resources/specs/amqp0-9-1.pdf, https://www.rabbitmq.com/resources/specs/amqp0-8.pdf
//Field names MUST start with a letter, '$' or '#' and may continue with letters, '$' or '#', digits,
// or underlines, to a maximum length of 128 characters
//The server SHOULD validate field names and upon receiving an invalid field name, it SHOULD signal a connection
// exception with reply code 503 (syntax error)
//validating length only and delegating other stuff to server, as rabbit seems to currently support numeric keys
if (!($len = strlen($key)) || ($len > 128)) {
throw new Exception\AMQPInvalidArgumentException(
'Table key must be non-empty string up to 128 chars in length'
);
}
$this->setValue($val, $type, $key);
}
}
<?php
namespace PhpAmqpLib\Wire;
use PhpAmqpLib\Exception\AMQPInvalidArgumentException;
use PhpAmqpLib\Exception\AMQPOutOfRangeException;
use PhpAmqpLib\Helper\BigInteger;
class AMQPWriter extends AMQPByteStream
{
/** @var string */
protected $out = '';
/** @var array */
protected $bits = array();
/** @var int */
protected $bitcount = 0;
private function flushbits()
{
if (!empty($this->bits)) {
$this->out .= implode('', array_map('chr', $this->bits));
$this->bits = array();
$this->bitcount = 0;
}
}
/**
* Get what's been encoded so far.
*
* @return string
*/
public function getvalue()
{
/* temporarily needed for compatibility with write_bit unit tests */
if ($this->bitcount) {
$this->flushbits();
}
return $this->out;
}
/**
* Write a plain PHP string, with no special encoding.
*
* @param string $s
*
* @return $this
*/
public function write($s)
{
$this->out .= $s;
return $this;
}
/**
* Write a boolean value.
* (deprecated, use write_bits instead)
*
* @deprecated
* @param bool $b
* @return $this
*/
public function write_bit($b)
{
$b = $b ? 1 : 0;
$shift = $this->bitcount % 8;
$last = $shift === 0 ? 0 : array_pop($this->bits);
$last |= ($b << $shift);
$this->bits[] = $last;
$this->bitcount++;
return $this;
}
/**
* Write multiple bits as an octet
*
* @param bool[] $bits
* @return $this
*/
public function write_bits($bits)
{
$value = 0;
foreach ($bits as $n => $bit) {
$bit = $bit ? 1 : 0;
$value |= ($bit << $n);
}
$this->out .= chr($value);
return $this;
}
/**
* Write an integer as an unsigned 8-bit value
*
* @param int $n
* @return $this
* @throws \PhpAmqpLib\Exception\AMQPInvalidArgumentException
*/
public function write_octet($n)
{
if ($n < 0 || $n > 255) {
throw new AMQPInvalidArgumentException('Octet out of range: ' . $n);
}
$this->out .= chr($n);
return $this;
}
/**
* @param int $n
* @return $this
*/
public function write_signed_octet($n)
{
if (($n < -128) || ($n > 127)) {
throw new AMQPInvalidArgumentException('Signed octet out of range: ' . $n);
}
$this->out .= pack('c', $n);
return $this;
}
/**
* Write an integer as an unsigned 16-bit value
*
* @param int $n
* @return $this
* @throws \PhpAmqpLib\Exception\AMQPInvalidArgumentException
*/
public function write_short($n)
{
if ($n < 0 || $n > 65535) {
throw new AMQPInvalidArgumentException('Short out of range: ' . $n);
}
$this->out .= pack('n', $n);
return $this;
}
/**
* @param int $n
* @return $this
*/
public function write_signed_short($n)
{
if (($n < -32768) || ($n > 32767)) {
throw new AMQPInvalidArgumentException('Signed short out of range: ' . $n);
}
$this->out .= $this->correctEndianness(pack('s', $n));
return $this;
}
/**
* Write an integer as an unsigned 32-bit value
*
* @param int|string $n
* @return $this
*/
public function write_long($n)
{
if (($n < 0) || ($n > 4294967295)) {
throw new AMQPInvalidArgumentException('Long out of range: ' . $n);
}
//Numeric strings >PHP_INT_MAX on 32bit are casted to PHP_INT_MAX, damn PHP
if (!self::PLATFORM_64BIT && is_string($n)) {
$n = (float) $n;
}
$this->out .= pack('N', $n);
return $this;
}
/**
* @param int $n
* @return $this
*/
private function writeSignedLong($n)
{
if (($n < -2147483648) || ($n > 2147483647)) {
throw new AMQPInvalidArgumentException('Signed long out of range: ' . $n);
}
//on my 64bit debian this approach is slightly faster than splitIntoQuads()
$this->out .= $this->correctEndianness(pack('l', $n));
return $this;
}
/**
* Write a numeric value as an unsigned 64-bit value
*
* @param int|string $n
* @return $this
* @throws AMQPOutOfRangeException
*/
public function write_longlong($n)
{
if (is_int($n)) {
if ($n < 0) {
throw new AMQPOutOfRangeException('Longlong out of range: ' . $n);
}
if (self::PLATFORM_64BIT) {
$res = pack('J', $n);
$this->out .= $res;
} else {
$this->out .= pack('NN', 0, $n);
}
return $this;
}
$value = new BigInteger($n);
if (
$value->compare(self::getBigInteger('0')) < 0
|| $value->compare(self::getBigInteger('FFFFFFFFFFFFFFFF', 16)) > 0
) {
throw new AMQPInvalidArgumentException('Longlong out of range: ' . $n);
}
$value->setPrecision(64);
$this->out .= $value->toBytes();
return $this;
}
/**
* @param int|string $n
* @return $this
*/
public function write_signed_longlong($n)
{
if (is_int($n)) {
if (self::PLATFORM_64BIT) {
// q is for 64-bit signed machine byte order
$packed = pack('q', $n);
if (self::isLittleEndian()) {
$packed = $this->convertByteOrder($packed);
}
$this->out .= $packed;
} else {
$hi = $n < 0 ? -1 : 0;
$lo = $n;
$this->out .= pack('NN', $hi, $lo);
}
return $this;
}
$value = new BigInteger($n);
if (
$value->compare(self::getBigInteger('-8000000000000000', 16)) < 0
|| $value->compare(self::getBigInteger('7FFFFFFFFFFFFFFF', 16)) > 0
) {
throw new AMQPInvalidArgumentException('Signed longlong out of range: ' . $n);
}
$value->setPrecision(64);
$this->out .= substr($value->toBytes(true), -8);
return $this;
}
/**
* Write a string up to 255 bytes long after encoding.
* Assume UTF-8 encoding
*
* @param string $s
* @return $this
* @throws \PhpAmqpLib\Exception\AMQPInvalidArgumentException
*/
public function write_shortstr($s)
{
if ($s === null) {
$this->write_octet(0);
return $this;
}
$len = mb_strlen($s, 'ASCII');
if ($len > 255) {
throw new AMQPInvalidArgumentException('String too long');
}
$this->write_octet($len);
$this->out .= $s;
return $this;
}
/**
* Write a string up to 2**32 bytes long. Assume UTF-8 encoding
*
* @param string $s
* @return $this
*/
public function write_longstr($s)
{
if ($s === null) {
$this->write_long(0);
return $this;
}
$this->write_long(mb_strlen($s, 'ASCII'));
$this->out .= $s;
return $this;
}
/**
* Supports the writing of Array types, so that you can implement
* array methods, like Rabbitmq's HA parameters
*
* @param AMQPArray|array $a Instance of AMQPArray or PHP array WITHOUT format hints (unlike write_table())
* @return self
*/
public function write_array($a)
{
if (!($a instanceof AMQPArray)) {
$a = new AMQPArray($a);
}
$data = new self();
foreach ($a as $v) {
$data->writeValue($v[0], $v[1]);
}
$data = $data->getvalue();
$this->write_long(mb_strlen($data, 'ASCII'));
$this->write($data);
return $this;
}
/**
* Write unix time_t value as 64 bit timestamp
*
* @param int $v
* @return $this
*/
public function write_timestamp($v)
{
$this->write_longlong($v);
return $this;
}
/**
* Write PHP array, as table. Input array format: keys are strings,
* values are (type,value) tuples.
*
* @param AMQPTable|array $d Instance of AMQPTable or PHP array WITH format hints (unlike write_array())
* @return $this
* @throws \PhpAmqpLib\Exception\AMQPInvalidArgumentException
*/
public function write_table($d)
{
$typeIsSym = !($d instanceof AMQPTable); //purely for back-compat purposes
$table_data = new self();
foreach ($d as $k => $va) {
list($ftype, $v) = $va;
$table_data->write_shortstr($k);
$table_data->writeValue($typeIsSym ? AMQPAbstractCollection::getDataTypeForSymbol($ftype) : $ftype, $v);
}
$table_data = $table_data->getvalue();
$this->write_long(mb_strlen($table_data, 'ASCII'));
$this->write($table_data);
return $this;
}
/**
* for compat with method mapping used by AMQPMessage
*
* @param AMQPTable|array $d
* @return $this
*/
public function write_table_object($d)
{
return $this->write_table($d);
}
/**
* @param int $type One of AMQPAbstractCollection::T_* constants
* @param mixed $val
*/
private function writeValue($type, $val)
{
//This will find appropriate symbol for given data type for currently selected protocol
//Also will raise an exception on unknown type
$this->write(AMQPAbstractCollection::getSymbolForDataType($type));
switch ($type) {
case AMQPAbstractCollection::T_INT_SHORTSHORT:
$this->write_signed_octet($val);
break;
case AMQPAbstractCollection::T_INT_SHORTSHORT_U:
$this->write_octet($val);
break;
case AMQPAbstractCollection::T_INT_SHORT:
$this->write_signed_short($val);
break;
case AMQPAbstractCollection::T_INT_SHORT_U:
$this->write_short($val);
break;
case AMQPAbstractCollection::T_INT_LONG:
$this->writeSignedLong($val);
break;
case AMQPAbstractCollection::T_INT_LONG_U:
$this->write_long($val);
break;
case AMQPAbstractCollection::T_INT_LONGLONG:
$this->write_signed_longlong($val);
break;
case AMQPAbstractCollection::T_INT_LONGLONG_U:
$this->write_longlong($val);
break;
case AMQPAbstractCollection::T_DECIMAL:
$this->write_octet($val->getE());
$this->writeSignedLong($val->getN());
break;
case AMQPAbstractCollection::T_TIMESTAMP:
$this->write_timestamp($val);
break;
case AMQPAbstractCollection::T_BOOL:
$this->write_octet($val ? 1 : 0);
break;
case AMQPAbstractCollection::T_STRING_SHORT:
$this->write_shortstr($val);
break;
case AMQPAbstractCollection::T_STRING_LONG:
$this->write_longstr($val);
break;
case AMQPAbstractCollection::T_ARRAY:
$this->write_array($val);
break;
case AMQPAbstractCollection::T_TABLE:
$this->write_table($val);
break;
case AMQPAbstractCollection::T_VOID:
break;
case AMQPAbstractCollection::T_BYTES:
$this->write_longstr($val);
break;
default:
throw new AMQPInvalidArgumentException(sprintf(
'Unsupported type "%s"',
$type
));
}
}
}
<?php
namespace PhpAmqpLib\Wire;
abstract class Constants
{
const VERSION = '';
const AMQP_HEADER = '';
/**
* @var array<int, string>
*/
protected static $FRAME_TYPES = array();
/**
* @var array<int, string>
*/
protected static $CONTENT_METHODS = array();
/**
* @var array<int, string>
*/
protected static $CLOSE_METHODS = array();
/**
* @var array<string, string>
*/
public static $GLOBAL_METHOD_NAMES = array();
/**
* @return string
*/
public function getHeader()
{
return static::AMQP_HEADER;
}
/**
* @param int $type
* @return bool
*/
public function isFrameType($type)
{
return array_key_exists($type, static::$FRAME_TYPES);
}
/**
* @param int $type
* @return string
*/
public function getFrameType($type)
{
return static::$FRAME_TYPES[$type];
}
/**
* @param string $method
* @return bool
*/
public function isContentMethod($method)
{
return in_array($method, static::$CONTENT_METHODS, false);
}
/**
* @param string $method
* @return bool
*/
public function isCloseMethod($method)
{
return in_array($method, static::$CLOSE_METHODS, false);
}
}
<?php
/* This file was autogenerated by spec/parser.php - Do not modify */
namespace PhpAmqpLib\Wire;
final class Constants080 extends Constants
{
const VERSION = '8.0';
const AMQP_HEADER = "AMQP\x01\x01\x08\x00";
/**
* @var array
*/
public static $FRAME_TYPES = array(
1 => 'FRAME-METHOD',
2 => 'FRAME-HEADER',
3 => 'FRAME-BODY',
4 => 'FRAME-OOB-METHOD',
5 => 'FRAME-OOB-HEADER',
6 => 'FRAME-OOB-BODY',
7 => 'FRAME-TRACE',
8 => 'FRAME-HEARTBEAT',
4096 => 'FRAME-MIN-SIZE',
206 => 'FRAME-END',
501 => 'FRAME-ERROR',
);
/**
* @var array
*/
public static $CONTENT_METHODS = array(
0 => '60,40',
1 => '60,50',
2 => '60,60',
3 => '60,71',
4 => '70,50',
5 => '70,70',
6 => '80,40',
7 => '80,50',
8 => '80,60',
9 => '110,10',
10 => '120,40',
11 => '120,41',
);
/**
* @var array
*/
public static $CLOSE_METHODS = array(
0 => '10,60',
1 => '20,40',
);
/**
* @var array
*/
public static $GLOBAL_METHOD_NAMES = array(
'10,10' => 'Connection.start',
'10,11' => 'Connection.start_ok',
'10,20' => 'Connection.secure',
'10,21' => 'Connection.secure_ok',
'10,30' => 'Connection.tune',
'10,31' => 'Connection.tune_ok',
'10,40' => 'Connection.open',
'10,41' => 'Connection.open_ok',
'10,50' => 'Connection.redirect',
'10,60' => 'Connection.close',
'10,61' => 'Connection.close_ok',
'20,10' => 'Channel.open',
'20,11' => 'Channel.open_ok',
'20,20' => 'Channel.flow',
'20,21' => 'Channel.flow_ok',
'20,30' => 'Channel.alert',
'20,40' => 'Channel.close',
'20,41' => 'Channel.close_ok',
'30,10' => 'Access.request',
'30,11' => 'Access.request_ok',
'40,10' => 'Exchange.declare',
'40,11' => 'Exchange.declare_ok',
'40,20' => 'Exchange.delete',
'40,21' => 'Exchange.delete_ok',
'50,10' => 'Queue.declare',
'50,11' => 'Queue.declare_ok',
'50,20' => 'Queue.bind',
'50,21' => 'Queue.bind_ok',
'50,30' => 'Queue.purge',
'50,31' => 'Queue.purge_ok',
'50,40' => 'Queue.delete',
'50,41' => 'Queue.delete_ok',
'50,50' => 'Queue.unbind',
'50,51' => 'Queue.unbind_ok',
'60,10' => 'Basic.qos',
'60,11' => 'Basic.qos_ok',
'60,20' => 'Basic.consume',
'60,21' => 'Basic.consume_ok',
'60,30' => 'Basic.cancel',
'60,31' => 'Basic.cancel_ok',
'60,40' => 'Basic.publish',
'60,50' => 'Basic.return',
'60,60' => 'Basic.deliver',
'60,70' => 'Basic.get',
'60,71' => 'Basic.get_ok',
'60,72' => 'Basic.get_empty',
'60,80' => 'Basic.ack',
'60,90' => 'Basic.reject',
'60,100' => 'Basic.recover_async',
'60,110' => 'Basic.recover',
'60,111' => 'Basic.recover_ok',
'70,10' => 'File.qos',
'70,11' => 'File.qos_ok',
'70,20' => 'File.consume',
'70,21' => 'File.consume_ok',
'70,30' => 'File.cancel',
'70,31' => 'File.cancel_ok',
'70,40' => 'File.open',
'70,41' => 'File.open_ok',
'70,50' => 'File.stage',
'70,60' => 'File.publish',
'70,70' => 'File.return',
'70,80' => 'File.deliver',
'70,90' => 'File.ack',
'70,100' => 'File.reject',
'80,10' => 'Stream.qos',
'80,11' => 'Stream.qos_ok',
'80,20' => 'Stream.consume',
'80,21' => 'Stream.consume_ok',
'80,30' => 'Stream.cancel',
'80,31' => 'Stream.cancel_ok',
'80,40' => 'Stream.publish',
'80,50' => 'Stream.return',
'80,60' => 'Stream.deliver',
'90,10' => 'Tx.select',
'90,11' => 'Tx.select_ok',
'90,20' => 'Tx.commit',
'90,21' => 'Tx.commit_ok',
'90,30' => 'Tx.rollback',
'90,31' => 'Tx.rollback_ok',
'100,10' => 'Dtx.select',
'100,11' => 'Dtx.select_ok',
'100,20' => 'Dtx.start',
'100,21' => 'Dtx.start_ok',
'110,10' => 'Tunnel.request',
'120,10' => 'Test.integer',
'120,11' => 'Test.integer_ok',
'120,20' => 'Test.string',
'120,21' => 'Test.string_ok',
'120,30' => 'Test.table',
'120,31' => 'Test.table_ok',
'120,40' => 'Test.content',
'120,41' => 'Test.content_ok',
);
}
<?php
/* This file was autogenerated by spec/parser.php - Do not modify */
namespace PhpAmqpLib\Wire;
final class Constants091 extends Constants
{
const VERSION = '0.9.1';
const AMQP_HEADER = "AMQP\x00\x00\x09\x01";
/**
* @var array
*/
public static $FRAME_TYPES = array(
1 => 'FRAME-METHOD',
2 => 'FRAME-HEADER',
3 => 'FRAME-BODY',
8 => 'FRAME-HEARTBEAT',
4096 => 'FRAME-MIN-SIZE',
206 => 'FRAME-END',
501 => 'FRAME-ERROR',
);
/**
* @var array
*/
public static $CONTENT_METHODS = array(
0 => '60,40',
1 => '60,50',
2 => '60,60',
3 => '60,71',
);
/**
* @var array
*/
public static $CLOSE_METHODS = array(
0 => '10,50',
1 => '20,40',
);
/**
* @var array
*/
public static $GLOBAL_METHOD_NAMES = array(
'10,10' => 'Connection.start',
'10,11' => 'Connection.start_ok',
'10,20' => 'Connection.secure',
'10,21' => 'Connection.secure_ok',
'10,30' => 'Connection.tune',
'10,31' => 'Connection.tune_ok',
'10,40' => 'Connection.open',
'10,41' => 'Connection.open_ok',
'10,50' => 'Connection.close',
'10,51' => 'Connection.close_ok',
'10,60' => 'Connection.blocked',
'10,61' => 'Connection.unblocked',
'20,10' => 'Channel.open',
'20,11' => 'Channel.open_ok',
'20,20' => 'Channel.flow',
'20,21' => 'Channel.flow_ok',
'20,40' => 'Channel.close',
'20,41' => 'Channel.close_ok',
'30,10' => 'Access.request',
'30,11' => 'Access.request_ok',
'40,10' => 'Exchange.declare',
'40,11' => 'Exchange.declare_ok',
'40,20' => 'Exchange.delete',
'40,21' => 'Exchange.delete_ok',
'40,30' => 'Exchange.bind',
'40,31' => 'Exchange.bind_ok',
'40,40' => 'Exchange.unbind',
'40,51' => 'Exchange.unbind_ok',
'50,10' => 'Queue.declare',
'50,11' => 'Queue.declare_ok',
'50,20' => 'Queue.bind',
'50,21' => 'Queue.bind_ok',
'50,30' => 'Queue.purge',
'50,31' => 'Queue.purge_ok',
'50,40' => 'Queue.delete',
'50,41' => 'Queue.delete_ok',
'50,50' => 'Queue.unbind',
'50,51' => 'Queue.unbind_ok',
'60,10' => 'Basic.qos',
'60,11' => 'Basic.qos_ok',
'60,20' => 'Basic.consume',
'60,21' => 'Basic.consume_ok',
'60,30' => 'Basic.cancel',
'60,31' => 'Basic.cancel_ok',
'60,40' => 'Basic.publish',
'60,50' => 'Basic.return',
'60,60' => 'Basic.deliver',
'60,70' => 'Basic.get',
'60,71' => 'Basic.get_ok',
'60,72' => 'Basic.get_empty',
'60,80' => 'Basic.ack',
'60,90' => 'Basic.reject',
'60,100' => 'Basic.recover_async',
'60,110' => 'Basic.recover',
'60,111' => 'Basic.recover_ok',
'60,120' => 'Basic.nack',
'90,10' => 'Tx.select',
'90,11' => 'Tx.select_ok',
'90,20' => 'Tx.commit',
'90,21' => 'Tx.commit_ok',
'90,30' => 'Tx.rollback',
'90,31' => 'Tx.rollback_ok',
'85,10' => 'Confirm.select',
'85,11' => 'Confirm.select_ok',
);
}
<?php
namespace PhpAmqpLib\Wire\IO;
use PhpAmqpLib\Connection\AMQPConnectionConfig;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPHeartbeatMissedException;
use PhpAmqpLib\Exception\AMQPIOWaitException;
use PhpAmqpLib\Wire\AMQPWriter;
abstract class AbstractIO
{
const BUFFER_SIZE = 8192;
/** @var null|AMQPConnectionConfig */
protected $config;
/** @var string */
protected $host;
/** @var int */
protected $port;
/** @var int|float */
protected $connection_timeout;
/** @var float */
protected $read_timeout;
/** @var float */
protected $write_timeout;
/** @var int */
protected $heartbeat;
/** @var int */
protected $initial_heartbeat;
/** @var bool */
protected $keepalive;
/** @var int|float */
protected $last_read;
/** @var int|float */
protected $last_write;
/** @var array|null */
protected $last_error;
/** @var bool */
protected $canDispatchPcntlSignal = false;
/**
* @param int $len
* @return string
* @throws \PhpAmqpLib\Exception\AMQPIOException
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
* @throws \PhpAmqpLib\Exception\AMQPSocketException
* @throws \PhpAmqpLib\Exception\AMQPTimeoutException
* @throws \PhpAmqpLib\Exception\AMQPConnectionClosedException
*/
abstract public function read($len);
/**
* @param string $data
* @throws \PhpAmqpLib\Exception\AMQPIOException
* @throws \PhpAmqpLib\Exception\AMQPSocketException
* @throws \PhpAmqpLib\Exception\AMQPConnectionClosedException
* @throws \PhpAmqpLib\Exception\AMQPTimeoutException
*/
abstract public function write($data);
/**
* @return void
*/
abstract public function close();
/**
* @param int|null $sec
* @param int $usec
* @return int
* @throws \PhpAmqpLib\Exception\AMQPIOException
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
*/
public function select(?int $sec, int $usec = 0)
{
$this->check_heartbeat();
$this->setErrorHandler();
try {
$result = $this->do_select($sec, $usec);
$this->throwOnError();
} catch (\ErrorException $e) {
throw new AMQPIOWaitException($e->getMessage(), $e->getCode(), $e);
} finally {
$this->restoreErrorHandler();
}
if ($this->canDispatchPcntlSignal) {
pcntl_signal_dispatch();
}
// no exception and false result - either timeout or signal was sent
if ($result === false) {
$result = 0;
}
return $result;
}
/**
* @param int|null $sec
* @param int $usec
* @return int|bool
* @throws AMQPConnectionClosedException
*/
abstract protected function do_select(?int $sec, int $usec);
/**
* Set ups the connection.
* @return void
* @throws \PhpAmqpLib\Exception\AMQPIOException
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
*/
abstract public function connect();
/**
* Set connection params connection tune(negotiation).
* @param int $heartbeat
*/
public function afterTune(int $heartbeat): void
{
$this->heartbeat = $heartbeat;
$this->initial_heartbeat = $heartbeat;
}
/**
* Heartbeat logic: check connection health here
* @return void
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
*/
public function check_heartbeat()
{
// ignore unless heartbeat interval is set
if ($this->heartbeat !== 0 && $this->last_read > 0 && $this->last_write > 0) {
// server has gone away
$this->checkBrokerHeartbeat();
// time for client to send a heartbeat
$now = microtime(true);
if (($this->heartbeat / 2) < $now - $this->last_write) {
$this->write_heartbeat();
}
}
}
/**
* @throws \PhpAmqpLib\Exception\AMQPHeartbeatMissedException
*/
protected function checkBrokerHeartbeat()
{
if ($this->heartbeat > 0 && ($this->last_read > 0 || $this->last_write > 0)) {
$lastActivity = $this->getLastActivity();
$now = microtime(true);
if (($now - $lastActivity) > $this->heartbeat * 2 + 1) {
$this->close();
throw new AMQPHeartbeatMissedException('Missed server heartbeat');
}
}
}
/**
* @return float|int
*/
public function getLastActivity()
{
return max($this->last_read, $this->last_write);
}
public function getReadTimeout(): float
{
return $this->read_timeout;
}
/**
* @return $this
*/
public function disableHeartbeat()
{
$this->initial_heartbeat = $this->heartbeat;
$this->heartbeat = 0;
return $this;
}
/**
* @return $this
*/
public function reenableHeartbeat()
{
$this->heartbeat = $this->initial_heartbeat;
return $this;
}
/**
* Sends a heartbeat message
*/
protected function write_heartbeat()
{
$pkt = new AMQPWriter();
$pkt->write_octet(8);
$pkt->write_short(0);
$pkt->write_long(0);
$pkt->write_octet(0xCE);
$this->write($pkt->getvalue());
}
/**
* Begin tracking errors and set the error handler
*/
protected function setErrorHandler(): void
{
$this->last_error = null;
set_error_handler(array($this, 'error_handler'));
}
protected function throwOnError(): void
{
if ($this->last_error !== null) {
throw new \ErrorException(
$this->last_error['errstr'],
0,
$this->last_error['errno'],
$this->last_error['errfile'],
$this->last_error['errline']
);
}
}
protected function restoreErrorHandler(): void
{
restore_error_handler();
}
/**
* Internal error handler to deal with stream and socket errors.
*
* @param int $errno
* @param string $errstr
* @param string $errfile
* @param int $errline
* @param array $errcontext
* @return void
*/
public function error_handler($errno, $errstr, $errfile, $errline, $errcontext = null)
{
// throwing an exception in an error handler will halt execution
// set the last error and continue
$this->last_error = compact('errno', 'errstr', 'errfile', 'errline', 'errcontext');
}
/**
* @return bool
*/
protected function isPcntlSignalEnabled()
{
return extension_loaded('pcntl')
&& function_exists('pcntl_signal_dispatch')
&& (defined('AMQP_WITHOUT_SIGNALS') ? !AMQP_WITHOUT_SIGNALS : true);
}
}
<?php
namespace PhpAmqpLib\Wire\IO;
use PhpAmqpLib\Connection\AMQPConnectionConfig;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Exception\AMQPSocketException;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Helper\MiscHelper;
use PhpAmqpLib\Helper\SocketConstants;
class SocketIO extends AbstractIO
{
/** @var null|resource */
private $sock;
/**
* @param string $host
* @param int $port
* @param int|float $read_timeout
* @param bool $keepalive
* @param int|float|null $write_timeout if null defaults to read timeout
* @param int $heartbeat how often to send heartbeat. 0 means off
* @param null|AMQPConnectionConfig $config
*/
public function __construct(
$host,
$port,
$read_timeout = 3,
$keepalive = false,
$write_timeout = null,
$heartbeat = 0,
?AMQPConnectionConfig $config = null
) {
$this->config = $config;
$this->host = $host;
$this->port = $port;
$this->read_timeout = (float)$read_timeout;
$this->write_timeout = (float)($write_timeout ?: $read_timeout);
$this->heartbeat = $heartbeat;
$this->initial_heartbeat = $heartbeat;
$this->keepalive = $keepalive;
$this->canDispatchPcntlSignal = $this->isPcntlSignalEnabled();
/*
TODO FUTURE enable this check
php-amqplib/php-amqplib#648, php-amqplib/php-amqplib#666
if ($this->heartbeat !== 0 && ($this->read_timeout <= ($this->heartbeat * 2))) {
throw new \InvalidArgumentException('read_timeout must be greater than 2x the heartbeat');
}
if ($this->heartbeat !== 0 && ($this->write_timeout <= ($this->heartbeat * 2))) {
throw new \InvalidArgumentException('send_timeout must be greater than 2x the heartbeat');
}
*/
}
/**
* @inheritdoc
*/
public function connect()
{
$this->sock = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->write_timeout);
socket_set_option($this->sock, SOL_SOCKET, SO_SNDTIMEO, array('sec' => $sec, 'usec' => $uSec));
list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout);
socket_set_option($this->sock, SOL_SOCKET, SO_RCVTIMEO, array('sec' => $sec, 'usec' => $uSec));
$this->setErrorHandler();
try {
$connected = socket_connect($this->sock, $this->host, $this->port);
$this->throwOnError();
} catch (\ErrorException $e) {
$connected = false;
} finally {
$this->restoreErrorHandler();
}
if (!$connected) {
$errno = socket_last_error($this->sock);
$errstr = socket_strerror($errno);
throw new AMQPIOException(sprintf(
'Error Connecting to server (%s): %s',
$errno,
$errstr
), $errno);
}
socket_set_block($this->sock);
socket_set_option($this->sock, SOL_TCP, TCP_NODELAY, 1);
if ($this->config && $this->config->getSendBufferSize() > 0) {
socket_set_option($this->sock, SOL_SOCKET, SO_SNDBUF, $this->config->getSendBufferSize());
}
if ($this->keepalive) {
$this->enable_keepalive();
}
$this->heartbeat = $this->initial_heartbeat;
}
/**
* @inheritdoc
*/
public function getSocket()
{
return $this->sock;
}
/**
* @inheritdoc
*/
public function read($len)
{
if (is_null($this->sock)) {
throw new AMQPSocketException(sprintf(
'Socket was null! Last SocketError was: %s',
socket_strerror(socket_last_error())
));
}
$this->check_heartbeat();
list($timeout_sec, $timeout_uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout);
$read_start = microtime(true);
$read = 0;
$data = '';
while ($read < $len) {
$buffer = null;
$result = socket_recv($this->sock, $buffer, $len - $read, 0);
if ($result === 0) {
// From linux recv() manual:
// When a stream socket peer has performed an orderly shutdown,
// the return value will be 0 (the traditional "end-of-file" return).
// http://php.net/manual/en/function.socket-recv.php#47182
$this->close();
throw new AMQPConnectionClosedException('Broken pipe or closed connection');
}
if (empty($buffer)) {
$read_now = microtime(true);
$t_read = $read_now - $read_start;
if ($t_read > $this->read_timeout) {
throw new AMQPTimeoutException('Too many read attempts detected in SocketIO');
}
$this->select($timeout_sec, $timeout_uSec);
continue;
}
$read += mb_strlen($buffer, 'ASCII');
$data .= $buffer;
}
if (mb_strlen($data, 'ASCII') !== $len) {
throw new AMQPIOException(sprintf(
'Error reading data. Received %s instead of expected %s bytes',
mb_strlen($data, 'ASCII'),
$len
));
}
$this->last_read = microtime(true);
return $data;
}
/**
* @inheritdoc
*/
public function write($data)
{
// Null sockets are invalid, throw exception
if (is_null($this->sock)) {
throw new AMQPSocketException(sprintf(
'Socket was null! Last SocketError was: %s',
socket_strerror(socket_last_error())
));
}
$this->checkBrokerHeartbeat();
$written = 0;
$len = mb_strlen($data, 'ASCII');
$write_start = microtime(true);
while ($written < $len) {
$this->setErrorHandler();
try {
$this->select_write();
$buffer = mb_substr($data, $written, self::BUFFER_SIZE, 'ASCII');
$result = socket_write($this->sock, $buffer);
$this->throwOnError();
} catch (\ErrorException $e) {
$code = socket_last_error($this->sock);
$constants = SocketConstants::getInstance();
switch ($code) {
case $constants->SOCKET_EPIPE:
case $constants->SOCKET_ENETDOWN:
case $constants->SOCKET_ENETUNREACH:
case $constants->SOCKET_ENETRESET:
case $constants->SOCKET_ECONNABORTED:
case $constants->SOCKET_ECONNRESET:
case $constants->SOCKET_ECONNREFUSED:
case $constants->SOCKET_ETIMEDOUT:
$this->close();
throw new AMQPConnectionClosedException(socket_strerror($code), $code, $e);
default:
throw new AMQPIOException(sprintf(
'Error sending data. Last SocketError: %s',
socket_strerror($code)
), $code, $e);
}
} finally {
$this->restoreErrorHandler();
}
if ($result === false) {
throw new AMQPIOException(sprintf(
'Error sending data. Last SocketError: %s',
socket_strerror(socket_last_error($this->sock))
));
}
$now = microtime(true);
if ($result > 0) {
$this->last_write = $write_start = $now;
$written += $result;
} else {
if (($now - $write_start) > $this->write_timeout) {
throw AMQPTimeoutException::writeTimeout($this->write_timeout);
}
}
}
}
/**
* @inheritdoc
*/
public function close()
{
$this->disableHeartbeat();
if (is_resource($this->sock) || is_a($this->sock, \Socket::class)) {
socket_close($this->sock);
}
$this->sock = null;
$this->last_read = 0;
$this->last_write = 0;
}
/**
* @inheritdoc
*/
protected function do_select(?int $sec, int $usec)
{
if (!is_resource($this->sock) && !is_a($this->sock, \Socket::class)) {
$this->sock = null;
throw new AMQPConnectionClosedException('Broken pipe or closed connection', 0);
}
$read = array($this->sock);
$write = null;
$except = null;
return socket_select($read, $write, $except, $sec, $usec);
}
/**
* @return int|bool
*/
protected function select_write()
{
$read = $except = null;
$write = array($this->sock);
return socket_select($read, $write, $except, 0, 100000);
}
/**
* @throws \PhpAmqpLib\Exception\AMQPIOException
*/
protected function enable_keepalive()
{
if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) {
throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined');
}
socket_set_option($this->sock, SOL_SOCKET, SO_KEEPALIVE, 1);
}
/**
* @inheritdoc
*/
public function error_handler($errno, $errstr, $errfile, $errline, $errcontext = null)
{
$constants = SocketConstants::getInstance();
// socket_select warning that it has been interrupted by a signal - EINTR
if (isset($constants->SOCKET_EINTR) && false !== strrpos($errstr, socket_strerror($constants->SOCKET_EINTR))) {
// it's allowed while processing signals
return;
}
parent::error_handler($errno, $errstr, $errfile, $errline, $errcontext);
}
/**
* @inheritdoc
*/
protected function setErrorHandler(): void
{
parent::setErrorHandler();
socket_clear_error($this->sock);
}
}
<?php
namespace PhpAmqpLib\Wire\IO;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPDataReadException;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Helper\MiscHelper;
use PhpAmqpLib\Helper\SocketConstants;
class StreamIO extends AbstractIO
{
/** @var string */
protected $protocol;
/** @var null|resource */
protected $context;
/** @var null|resource */
private $sock;
/**
* @param string $host
* @param int $port
* @param float $connection_timeout
* @param float $read_write_timeout
* @param resource|array|null $context
* @param bool $keepalive
* @param int $heartbeat
* @param string|null $ssl_protocol
*/
public function __construct(
$host,
$port,
$connection_timeout,
$read_write_timeout,
$context = null,
$keepalive = false,
$heartbeat = 0,
$ssl_protocol = null
) {
// TODO FUTURE change comparison to <=
// php-amqplib/php-amqplib#648, php-amqplib/php-amqplib#666
/*
TODO FUTURE enable this check
if ($heartbeat !== 0 && ($read_write_timeout < ($heartbeat * 2))) {
throw new \InvalidArgumentException('read_write_timeout must be at least 2x the heartbeat');
}
*/
if (!is_resource($context) || get_resource_type($context) !== 'stream-context') {
$context = stream_context_create();
}
$this->protocol = 'tcp';
$this->host = $host;
$this->port = $port;
$this->connection_timeout = $connection_timeout;
$this->read_timeout = (float)$read_write_timeout;
$this->write_timeout = (float)$read_write_timeout;
$this->context = $context;
$this->keepalive = $keepalive;
$this->heartbeat = $heartbeat;
$this->initial_heartbeat = $heartbeat;
$this->canDispatchPcntlSignal = $this->isPcntlSignalEnabled();
stream_context_set_option($this->context, 'socket', 'tcp_nodelay', true);
$options = stream_context_get_options($this->context);
if (!empty($options['ssl'])) {
if (isset($ssl_protocol)) {
$this->protocol = $ssl_protocol;
} else {
$this->protocol = 'ssl';
}
}
}
/**
* @inheritdoc
*/
public function connect()
{
$errstr = $errno = null;
$remote = sprintf(
'%s://%s:%s',
$this->protocol,
$this->host,
$this->port
);
$this->setErrorHandler();
try {
$this->sock = stream_socket_client(
$remote,
$errno,
$errstr,
$this->connection_timeout,
STREAM_CLIENT_CONNECT,
$this->context
);
$this->throwOnError();
} catch (\ErrorException $e) {
throw new AMQPIOException($e->getMessage());
} finally {
$this->restoreErrorHandler();
}
if (false === $this->sock) {
throw new AMQPIOException(
sprintf(
'Error Connecting to server(%s): %s ',
$errno,
$errstr
),
$errno
);
}
if (!stream_socket_get_name($this->sock, true)) {
throw new AMQPIOException(
sprintf(
'Connection refused: %s ',
$remote
)
);
}
list($sec, $uSec) = MiscHelper::splitSecondsMicroseconds(max($this->read_timeout, $this->write_timeout));
if (!stream_set_timeout($this->sock, $sec, $uSec)) {
throw new AMQPIOException('Timeout could not be set');
}
// php cannot capture signals while streams are blocking
if ($this->canDispatchPcntlSignal) {
stream_set_blocking($this->sock, 0);
stream_set_write_buffer($this->sock, 0);
if (function_exists('stream_set_read_buffer')) {
stream_set_read_buffer($this->sock, 0);
}
} else {
stream_set_blocking($this->sock, true);
}
if ($this->keepalive) {
$this->enable_keepalive();
}
$this->heartbeat = $this->initial_heartbeat;
}
/**
* @inheritdoc
*/
public function read($len)
{
$this->check_heartbeat();
list($timeout_sec, $timeout_uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout);
$read_start = microtime(true);
$read = 0;
$data = '';
while ($read < $len) {
if (!is_resource($this->sock) || feof($this->sock)) {
$this->close();
throw new AMQPConnectionClosedException('Broken pipe or closed connection');
}
$this->setErrorHandler();
try {
$buffer = fread($this->sock, ($len - $read));
$this->throwOnError();
} catch (\ErrorException $e) {
throw new AMQPDataReadException($e->getMessage(), $e->getCode(), $e);
} finally {
$this->restoreErrorHandler();
}
if ($buffer === false) {
throw new AMQPDataReadException('Error receiving data');
}
if ($buffer === '') {
$read_now = microtime(true);
$t_read = $read_now - $read_start;
if ($t_read > $this->read_timeout) {
throw new AMQPTimeoutException('Too many read attempts detected in StreamIO');
}
$this->select($timeout_sec, $timeout_uSec);
continue;
}
$this->last_read = microtime(true);
$read_start = $this->last_read;
$read += mb_strlen($buffer, 'ASCII');
$data .= $buffer;
}
if (mb_strlen($data, 'ASCII') !== $len) {
throw new AMQPDataReadException(
sprintf(
'Error reading data. Received %s instead of expected %s bytes',
mb_strlen($data, 'ASCII'),
$len
)
);
}
$this->last_read = microtime(true);
return $data;
}
/**
* @inheritdoc
*/
public function write($data)
{
$this->checkBrokerHeartbeat();
$written = 0;
$len = mb_strlen($data, 'ASCII');
$write_start = microtime(true);
while ($written < $len) {
if (!is_resource($this->sock) || feof($this->sock)) {
$this->close();
$constants = SocketConstants::getInstance();
throw new AMQPConnectionClosedException('Broken pipe or closed connection', $constants->SOCKET_EPIPE);
}
$result = false;
$this->setErrorHandler();
// OpenSSL's C library function SSL_write() can balk on buffers > 8192
// bytes in length, so we're limiting the write size here. On both TLS
// and plaintext connections, the write loop will continue until the
// buffer has been fully written.
// This behavior has been observed in OpenSSL dating back to at least
// September 2002:
// http://comments.gmane.org/gmane.comp.encryption.openssl.user/4361
try {
// check stream and prevent from high CPU usage
$this->select_write();
$buffer = mb_substr($data, $written, self::BUFFER_SIZE, 'ASCII');
$result = fwrite($this->sock, $buffer);
$this->throwOnError();
} catch (\ErrorException $e) {
$code = $this->last_error['errno'];
$constants = SocketConstants::getInstance();
switch ($code) {
case $constants->SOCKET_EPIPE:
case $constants->SOCKET_ENETDOWN:
case $constants->SOCKET_ENETUNREACH:
case $constants->SOCKET_ENETRESET:
case $constants->SOCKET_ECONNABORTED:
case $constants->SOCKET_ECONNRESET:
case $constants->SOCKET_ECONNREFUSED:
case $constants->SOCKET_ETIMEDOUT:
$this->close();
throw new AMQPConnectionClosedException(socket_strerror($code), $code, $e);
default:
throw new AMQPRuntimeException($e->getMessage(), $code, $e);
}
} finally {
$this->restoreErrorHandler();
}
if ($result === false) {
throw new AMQPRuntimeException('Error sending data');
}
if ($this->timed_out()) {
throw AMQPTimeoutException::writeTimeout($this->write_timeout);
}
$now = microtime(true);
if ($result > 0) {
$this->last_write = $write_start = $now;
$written += $result;
} else {
if (feof($this->sock)) {
$this->close();
throw new AMQPConnectionClosedException('Broken pipe or closed connection');
}
if (($now - $write_start) > $this->write_timeout) {
throw AMQPTimeoutException::writeTimeout($this->write_timeout);
}
}
}
}
/**
* @inheritdoc
*/
public function error_handler($errno, $errstr, $errfile, $errline, $errcontext = null)
{
$code = $this->extract_error_code($errstr);
$constants = SocketConstants::getInstance();
switch ($code) {
// fwrite notice that the stream isn't ready - EAGAIN or EWOULDBLOCK
case $constants->SOCKET_EAGAIN:
case $constants->SOCKET_EWOULDBLOCK:
// stream_select warning that it has been interrupted by a signal - EINTR
case $constants->SOCKET_EINTR:
return;
}
parent::error_handler($code > 0 ? $code : $errno, $errstr, $errfile, $errline, $errcontext);
}
public function close()
{
$this->disableHeartbeat();
if (is_resource($this->sock)) {
fclose($this->sock);
}
$this->sock = null;
$this->last_read = 0;
$this->last_write = 0;
}
/**
* @inheritdoc
*/
public function getSocket()
{
return $this->sock;
}
/**
* @inheritdoc
*/
protected function do_select(?int $sec, int $usec)
{
if ($this->sock === null || !is_resource($this->sock)) {
$this->sock = null;
throw new AMQPConnectionClosedException('Broken pipe or closed connection', 0);
}
$read = array($this->sock);
$write = null;
$except = null;
if ($sec === null && PHP_VERSION_ID >= 80100) {
$usec = 0;
}
return stream_select($read, $write, $except, $sec, $usec);
}
/**
* @return int|bool
*/
protected function select_write()
{
$read = $except = null;
$write = array($this->sock);
return stream_select($read, $write, $except, 0, 100000);
}
/**
* @return mixed
*/
protected function timed_out()
{
// get status of socket to determine whether or not it has timed out
$info = stream_get_meta_data($this->sock);
return $info['timed_out'];
}
/**
* @throws \PhpAmqpLib\Exception\AMQPIOException
*/
protected function enable_keepalive()
{
if ($this->protocol === 'ssl') {
throw new AMQPIOException('Can not enable keepalive: ssl connection does not support keepalive (#70939)');
}
if ($this->protocol === 'tls') {
throw new AMQPIOException('Can not enable keepalive: tls connection does not support keepalive (#70939)');
}
if (!function_exists('socket_import_stream')) {
throw new AMQPIOException('Can not enable keepalive: function socket_import_stream does not exist');
}
if (!defined('SOL_SOCKET') || !defined('SO_KEEPALIVE')) {
throw new AMQPIOException('Can not enable keepalive: SOL_SOCKET or SO_KEEPALIVE is not defined');
}
$socket = socket_import_stream($this->sock);
socket_set_option($socket, SOL_SOCKET, SO_KEEPALIVE, 1);
}
/**
* @param string $message
* @return int
*/
protected function extract_error_code($message)
{
if (0 === strpos($message, 'stream_select():')) {
$pattern = '/\s+\[(\d+)\]:\s+/';
} else {
$pattern = '/\s+errno=(\d+)\s+/';
}
$matches = array();
$result = preg_match($pattern, $message, $matches);
if ($result > 0) {
return (int)$matches[1];
}
return 0;
}
}