Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • Saransaran/php-class-project
  • sibidharan/php-class-project
  • Madhan1024/php-class-project
  • GopiKrishnan/photogram
  • Mhd_khalid/php-class-project
  • At_muthu__/php-class-project
  • jaganbhaskar155/php-class-project
  • hariharanrd/php-class-project
  • jasper715/php-class-project
  • hanuRakesh/photogram-project-main
  • Yuvaraj21/photogram
  • ram_rogers/php-class-project
  • Hihelloboy/php-class-project
  • Nadarajan/php-class-project
  • srisanthosh156/php-class-project
  • Buvaneshwaran.k/php-class-project
  • umarfarooq07/php-class-project
  • Dhanaprakash/php-class-project
  • jashwanth142003/php-class-project
  • Esakkiraja/php-class-project
  • Boomi/php-class-project
  • Kishore2071/php-class-project
  • Ram123raj/php-class-project
  • aswinkumar27/php-class-project
  • dhilipdhilip9655/php-class-project
  • Manikandam143/php-class-project
  • VikramS/php-class-project
  • ArnoldSam/php-class-project
  • gowthamapandi0008/php-class-project
  • d.barath7639/php-class-project
  • shyalandran/php-class-project
  • kiruba_432/php-class-project
  • razakias001/php-class-project
  • kannan.b2745/php-class-project
  • sathish236tsk/php-class-project
  • rii/php-class-project
  • jonathajh4k/php-class-project
  • Neelagandan_G/php-class-project
  • Tholkappiar2003/php-class-project
  • kamaleshselvam75/php-class-project
  • devapriyan/php-class-project
  • sanojahamed/php-class-project
  • rizwankendo/php-class-project
  • senthamilselvan18000/php-class-project
  • rajeshd01/php-class-project
  • Florence/php-class-project
  • vishnu191299/php-class-project
  • Rakeshrakki/php-class-project
  • sanjay057/php-class-project
  • amarsanthoshsanthosh/photogram-project-cp
  • md_ashmar/php-class-project
  • k.nandhishwaran777k/php-class-project
52 results
Show changes
Showing
with 1922 additions and 0 deletions
<?php
namespace PhpAmqpLib\Connection;
/**
* @deprecated AMQPLazySSLConnection can be lazy too. Use AMQPConnectionFactory with AMQPConnectionConfig::setIsLazy(true)
*/
class AMQPLazySSLConnection extends AMQPSSLConnection
{
/**
* @inheritDoc
*/
public function connectOnConstruct(): bool
{
return false;
}
/**
* @param string[][] $hosts
* @param string[] $options
* @return self
* @throws \Exception
* @deprecated Use ConnectionFactory
*/
public static function create_connection($hosts, $options = array())
{
if (count($hosts) > 1) {
throw new \RuntimeException('Lazy connection does not support multiple hosts');
}
return parent::create_connection($hosts, $options);
}
}
<?php
namespace PhpAmqpLib\Connection;
/**
* @deprecated AMQPSocketConnection can be lazy too. Use AMQPConnectionFactory with AMQPConnectionConfig::setIsLazy(true)
*/
class AMQPLazySocketConnection extends AMQPSocketConnection
{
/**
* @inheritDoc
*/
public function connectOnConstruct(): bool
{
return false;
}
/**
* @param string[][] $hosts
* @param string[] $options
* @return self
* @throws \Exception
* @deprecated Use ConnectionFactory
*/
public static function create_connection($hosts, $options = array())
{
if (count($hosts) > 1) {
throw new \RuntimeException('Lazy connection does not support multiple hosts');
}
return parent::create_connection($hosts, $options);
}
}
<?php
namespace PhpAmqpLib\Connection;
class AMQPSSLConnection extends AMQPStreamConnection
{
/**
* @param string $host
* @param int $port
* @param string $user
* @param string $password
* @param string $vhost
* @param array $ssl_options
* @param array $options
* @param string $ssl_protocol
* @param AMQPConnectionConfig|null $config
* @throws \Exception
*/
public function __construct(
$host,
$port,
$user,
$password,
$vhost = '/',
$ssl_options = array(),
$options = array(),
$ssl_protocol = 'ssl',
?AMQPConnectionConfig $config = null
) {
if (empty($ssl_options)) {
trigger_error('Using non-TLS instances of AMQPSSLConnection is deprecated and will be removed in version 4 of php-amqplib', E_USER_DEPRECATED);
$ssl_context = null;
} else {
$ssl_context = $this->createSslContext($ssl_options);
}
parent::__construct(
$host,
$port,
$user,
$password,
$vhost,
isset($options['insist']) ? $options['insist'] : false,
isset($options['login_method']) ? $options['login_method'] : 'AMQPLAIN',
isset($options['login_response']) ? $options['login_response'] : null,
isset($options['locale']) ? $options['locale'] : 'en_US',
isset($options['connection_timeout']) ? $options['connection_timeout'] : 3,
isset($options['read_write_timeout']) ? $options['read_write_timeout'] : 130,
$ssl_context,
isset($options['keepalive']) ? $options['keepalive'] : false,
isset($options['heartbeat']) ? $options['heartbeat'] : 0,
isset($options['channel_rpc_timeout']) ? $options['channel_rpc_timeout'] : 0.0,
$ssl_protocol,
$config
);
}
/**
* @deprecated Use ConnectionFactory
* @throws \Exception
*/
public static function try_create_connection($host, $port, $user, $password, $vhost, $options)
{
$ssl_options = isset($options['ssl_options']) ? $options['ssl_options'] : [];
return new static($host, $port, $user, $password, $vhost, $ssl_options, $options);
}
/**
* @param array $options
* @return resource
*/
private function createSslContext($options)
{
$ssl_context = stream_context_create();
foreach ($options as $k => $v) {
stream_context_set_option($ssl_context, 'ssl', $k, $v);
}
return $ssl_context;
}
}
<?php
namespace PhpAmqpLib\Connection;
use PhpAmqpLib\Wire\IO\SocketIO;
class AMQPSocketConnection extends AbstractConnection
{
/**
* @param string $host
* @param int $port
* @param string $user
* @param string $password
* @param string $vhost
* @param bool $insist
* @param string $login_method
* @param null $login_response @deprecated
* @param string $locale
* @param int|float $read_timeout
* @param bool $keepalive
* @param int $write_timeout
* @param int $heartbeat
* @param float $channel_rpc_timeout
* @param AMQPConnectionConfig|null $config
* @throws \Exception
*/
public function __construct(
$host,
$port,
$user,
$password,
$vhost = '/',
$insist = false,
$login_method = 'AMQPLAIN',
$login_response = null,
$locale = 'en_US',
$read_timeout = 3,
$keepalive = false,
$write_timeout = 3,
$heartbeat = 0,
$channel_rpc_timeout = 0.0,
?AMQPConnectionConfig $config = null
) {
if ($channel_rpc_timeout > $read_timeout) {
throw new \InvalidArgumentException('channel RPC timeout must not be greater than I/O read timeout');
}
$io = new SocketIO($host, $port, $read_timeout, $keepalive, $write_timeout, $heartbeat, $config);
parent::__construct(
$user,
$password,
$vhost,
$insist,
$login_method,
$login_response,
$locale,
$io,
$heartbeat,
max($read_timeout, $write_timeout),
$channel_rpc_timeout,
$config
);
}
/**
* @deprecated Use ConnectionFactory
* @throws \Exception
*/
protected static function try_create_connection($host, $port, $user, $password, $vhost, $options)
{
$insist = isset($options['insist']) ?
$options['insist'] : false;
$login_method = isset($options['login_method']) ?
$options['login_method'] : 'AMQPLAIN';
$login_response = isset($options['login_response']) ?
$options['login_response'] : null;
$locale = isset($options['locale']) ?
$options['locale'] : 'en_US';
$read_timeout = isset($options['read_timeout']) ?
$options['read_timeout'] : 3;
$keepalive = isset($options['keepalive']) ?
$options['keepalive'] : false;
$write_timeout = isset($options['write_timeout']) ?
$options['write_timeout'] : 3;
$heartbeat = isset($options['heartbeat']) ?
$options['heartbeat'] : 0;
$channel_rpc_timeout = isset($options['channel_rpc_timeout']) ?
$options['channel_rpc_timeout'] : 0.0;
return new static(
$host,
$port,
$user,
$password,
$vhost,
$insist,
$login_method,
$login_response,
$locale,
$read_timeout,
$keepalive,
$write_timeout,
$heartbeat,
$channel_rpc_timeout
);
}
}
<?php
namespace PhpAmqpLib\Connection;
use PhpAmqpLib\Wire\IO\StreamIO;
class AMQPStreamConnection extends AbstractConnection
{
/**
* @param string $host
* @param int $port
* @param string $user
* @param string $password
* @param string $vhost
* @param bool $insist
* @param string $login_method
* @param null $login_response @deprecated
* @param string $locale
* @param float $connection_timeout
* @param float $read_write_timeout
* @param resource|array|null $context
* @param bool $keepalive
* @param int $heartbeat
* @param float $channel_rpc_timeout
* @param string|null $ssl_protocol
* @param AMQPConnectionConfig|null $config
* @throws \Exception
*/
public function __construct(
$host,
$port,
$user,
$password,
$vhost = '/',
$insist = false,
$login_method = 'AMQPLAIN',
$login_response = null,
$locale = 'en_US',
$connection_timeout = 3.0,
$read_write_timeout = 3.0,
$context = null,
$keepalive = false,
$heartbeat = 0,
$channel_rpc_timeout = 0.0,
$ssl_protocol = null,
?AMQPConnectionConfig $config = null
) {
if ($channel_rpc_timeout > $read_write_timeout) {
throw new \InvalidArgumentException('channel RPC timeout must not be greater than I/O read-write timeout');
}
$io = new StreamIO(
$host,
$port,
$connection_timeout,
$read_write_timeout,
$context,
$keepalive,
$heartbeat,
$ssl_protocol
);
parent::__construct(
$user,
$password,
$vhost,
$insist,
$login_method,
$login_response,
$locale,
$io,
$heartbeat,
$connection_timeout,
$channel_rpc_timeout,
$config
);
// save the params for the use of __clone, this will overwrite the parent
$this->construct_params = func_get_args();
}
/**
* @deprecated Use ConnectionFactory
* @throws \Exception
*/
protected static function try_create_connection($host, $port, $user, $password, $vhost, $options)
{
$insist = isset($options['insist']) ?
$options['insist'] : false;
$login_method = isset($options['login_method']) ?
$options['login_method'] : 'AMQPLAIN';
$login_response = isset($options['login_response']) ?
$options['login_response'] : null;
$locale = isset($options['locale']) ?
$options['locale'] : 'en_US';
$connection_timeout = isset($options['connection_timeout']) ?
$options['connection_timeout'] : 3.0;
$read_write_timeout = isset($options['read_write_timeout']) ?
$options['read_write_timeout'] : 3.0;
$context = isset($options['context']) ?
$options['context'] : null;
$keepalive = isset($options['keepalive']) ?
$options['keepalive'] : false;
$heartbeat = isset($options['heartbeat']) ?
$options['heartbeat'] : 60;
$channel_rpc_timeout = isset($options['channel_rpc_timeout']) ?
$options['channel_rpc_timeout'] : 0.0;
return new static(
$host,
$port,
$user,
$password,
$vhost,
$insist,
$login_method,
$login_response,
$locale,
$connection_timeout,
$read_write_timeout,
$context,
$keepalive,
$heartbeat,
$channel_rpc_timeout
);
}
}
<?php
namespace PhpAmqpLib\Connection;
use PhpAmqpLib\Channel\AbstractChannel;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Channel\Frame;
use PhpAmqpLib\Exception\AMQPConnectionClosedException;
use PhpAmqpLib\Exception\AMQPHeartbeatMissedException;
use PhpAmqpLib\Exception\AMQPInvalidFrameException;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Exception\AMQPNoDataException;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Exception\AMQPSocketException;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Helper\Assert;
use PhpAmqpLib\Package;
use PhpAmqpLib\Wire;
use PhpAmqpLib\Wire\AMQPReader;
use PhpAmqpLib\Wire\AMQPTable;
use PhpAmqpLib\Wire\AMQPWriter;
use PhpAmqpLib\Wire\IO\AbstractIO;
abstract class AbstractConnection extends AbstractChannel
{
/**
* @var array
* @internal
*/
public static $LIBRARY_PROPERTIES = array(
'product' => array('S', Package::NAME),
'platform' => array('S', 'PHP'),
'version' => array('S', Package::VERSION),
'information' => array('S', ''),
'copyright' => array('S', ''),
'capabilities' => array(
'F',
array(
'authentication_failure_close' => array('t', true),
'publisher_confirms' => array('t', true),
'consumer_cancel_notify' => array('t', true),
'exchange_exchange_bindings' => array('t', true),
'basic.nack' => array('t', true),
'connection.blocked' => array('t', true)
)
)
);
/**
* @var AMQPChannel[]|AbstractChannel[]
* @internal
*/
public $channels = array();
/** @var int */
protected $version_major;
/** @var int */
protected $version_minor;
/** @var array */
protected $server_properties;
/** @var array */
protected $mechanisms;
/** @var array */
protected $locales;
/** @var bool */
protected $wait_tune_ok;
/** @var string */
protected $known_hosts;
/** @var null|Wire\AMQPIOReader */
protected $input;
/** @var string */
protected $vhost;
/** @var bool */
protected $insist;
/** @var string */
protected $login_method;
/**
* @var null|string
*/
protected $login_response;
/** @var string */
protected $locale;
/** @var int */
protected $heartbeat;
/** @var float */
protected $last_frame;
/** @var int */
protected $channel_max = 65535;
/** @var int */
protected $frame_max = 131072;
/** @var array Constructor parameters for clone */
protected $construct_params;
/** @var bool Close the connection in destructor */
protected $close_on_destruct = true;
/** @var bool Maintain connection status */
protected $is_connected = false;
/** @var AbstractIO */
protected $io;
/** @var callable Handles connection blocking from the server */
private $connection_block_handler;
/** @var callable Handles connection unblocking from the server */
private $connection_unblock_handler;
/** @var int Connection timeout value*/
protected $connection_timeout;
/** @var AMQPConnectionConfig|null */
protected $config;
/**
* Circular buffer to speed up prepare_content().
* Max size limited by $prepare_content_cache_max_size.
*
* @var array
* @see prepare_content()
*/
private $prepare_content_cache = array();
/** @var int Maximal size of $prepare_content_cache */
private $prepare_content_cache_max_size = 100;
/**
* Maximum time to wait for channel operations, in seconds
* @var float $channel_rpc_timeout
*/
private $channel_rpc_timeout;
/**
* If connection is blocked due to the broker running low on resources.
* @var bool
*/
protected $blocked = false;
/**
* If a frame is currently being written
* @var bool
*/
protected $writing = false;
/**
* @param string $user
* @param string $password
* @param string $vhost
* @param bool $insist
* @param string $login_method
* @param null $login_response @deprecated
* @param string $locale
* @param AbstractIO $io
* @param int $heartbeat
* @param int|float $connection_timeout
* @param int|float $channel_rpc_timeout
* @param \PhpAmqpLib\Connection\AMQPConnectionConfig | null $config
* @throws \Exception
*/
public function __construct(
$user,
$password,
$vhost = '/',
$insist = false,
$login_method = 'AMQPLAIN',
$login_response = null,
$locale = 'en_US',
AbstractIO $io = null,
$heartbeat = 0,
$connection_timeout = 0,
$channel_rpc_timeout = 0.0,
?AMQPConnectionConfig $config = null
) {
if (is_null($io)) {
throw new \InvalidArgumentException('Argument $io cannot be null');
}
if ($config) {
$this->config = clone $config;
}
// save the params for the use of __clone
$this->construct_params = func_get_args();
$this->vhost = $vhost;
$this->insist = $insist;
$this->login_method = $login_method;
$this->locale = $locale;
$this->io = $io;
$this->heartbeat = max(0, (int)$heartbeat);
$this->connection_timeout = $connection_timeout;
$this->channel_rpc_timeout = $channel_rpc_timeout;
if ($user && $password) {
if ($login_method === 'PLAIN') {
$this->login_response = sprintf("\0%s\0%s", $user, $password);
} elseif ($login_method === 'AMQPLAIN') {
$login_response = new AMQPWriter();
$login_response->write_table(array(
'LOGIN' => array('S', $user),
'PASSWORD' => array('S', $password)
));
// Skip the length
$responseValue = $login_response->getvalue();
$this->login_response = mb_substr($responseValue, 4, mb_strlen($responseValue, 'ASCII') - 4, 'ASCII');
} else {
throw new \InvalidArgumentException('Unknown login method: ' . $login_method);
}
} elseif ($login_method === 'EXTERNAL') {
$this->login_response = $login_response;
} else {
$this->login_response = null;
}
// Lazy Connection waits on connecting
if ($this->connectOnConstruct()) {
$this->connect();
}
}
/**
* Connects to the AMQP server
* @throws \Exception
*/
protected function connect()
{
$this->blocked = false;
try {
// Loop until we connect
while (!$this->isConnected()) {
// Assume we will connect, until we dont
$this->setIsConnected(true);
// Connect the socket
$this->io->connect();
$this->channels = array();
// The connection object itself is treated as channel 0
parent::__construct($this, 0);
$this->input = new Wire\AMQPIOReader($this->io);
$this->write($this->constants->getHeader());
// assume frame was sent successfully, used in $this->wait_channel()
$this->last_frame = microtime(true);
$this->wait(array($this->waitHelper->get_wait('connection.start')), false, $this->connection_timeout);
$this->x_start_ok(
$this->getLibraryProperties(),
$this->login_method,
$this->login_response,
$this->locale
);
$this->wait_tune_ok = true;
while ($this->wait_tune_ok) {
$this->wait(array(
$this->waitHelper->get_wait('connection.secure'),
$this->waitHelper->get_wait('connection.tune')
), false, $this->connection_timeout);
}
$host = $this->x_open($this->vhost, '', $this->insist);
if (!$host) {
//Reconnected
$this->io->reenableHeartbeat();
return null; // we weren't redirected
}
$this->setIsConnected(false);
$this->closeChannels();
// we were redirected, close the socket, loop and try again
$this->close_socket();
}
} catch (\Exception $e) {
// Something went wrong, set the connection status
$this->setIsConnected(false);
$this->closeChannels();
$this->close_input();
$this->close_socket();
throw $e; // Rethrow exception
}
}
/**
* Reconnects using the original connection settings.
* This will not recreate any channels that were established previously
* @throws \Exception
*/
public function reconnect()
{
// Try to close the AMQP connection
$this->safeClose();
// Reconnect the socket/stream then AMQP
$this->io->close();
// getIO can initiate the connection setting via LazyConnection, set it here to be sure
$this->setIsConnected(false);
$this->connect();
}
/**
* Cloning will use the old properties to make a new connection to the same server
*/
public function __clone()
{
if ($this->config) {
$this->config = clone $this->config;
}
call_user_func_array(array($this, '__construct'), $this->construct_params);
}
public function __destruct()
{
if ($this->close_on_destruct) {
$this->safeClose();
}
}
/**
* Attempts to close the connection safely
*/
protected function safeClose()
{
try {
if (null !== $this->input) {
$this->close();
}
} catch (\Exception $e) {
// Nothing here
}
}
/**
* @param int|null $sec
* @param int $usec
* @return int
* @throws AMQPIOException
* @throws AMQPRuntimeException
* @throws AMQPConnectionClosedException
* @throws AMQPRuntimeException
*/
public function select(?int $sec, int $usec = 0): int
{
try {
return $this->io->select($sec, $usec);
} catch (AMQPConnectionClosedException $e) {
$this->do_close();
throw $e;
} catch (AMQPRuntimeException $e) {
$this->setIsConnected(false);
throw $e;
}
}
/**
* Allows to not close the connection
* it's useful after the fork when you don't want to close parent process connection
*
* @param bool $close
*/
public function set_close_on_destruct($close = true)
{
$this->close_on_destruct = (bool) $close;
}
protected function close_input()
{
$this->debug && $this->debug->debug_msg('closing input');
if (null !== $this->input) {
$this->input->close();
$this->input = null;
}
}
protected function close_socket()
{
$this->debug && $this->debug->debug_msg('closing socket');
$this->io->close();
}
/**
* @param string $data
* @throws AMQPIOException
*/
public function write($data)
{
$this->debug->debug_hexdump($data);
try {
$this->writing = true;
$this->io->write($data);
} catch (AMQPConnectionClosedException $e) {
$this->do_close();
throw $e;
} catch (AMQPRuntimeException $e) {
$this->setIsConnected(false);
throw $e;
} finally {
$this->writing = false;
}
}
protected function do_close()
{
$this->frame_queue = new \SplQueue();
$this->method_queue = [];
$this->setIsConnected(false);
$this->close_input();
$this->close_socket();
}
/**
* @return int
* @throws AMQPRuntimeException
*/
public function get_free_channel_id()
{
for ($i = 1; $i <= $this->channel_max; $i++) {
if (!isset($this->channels[$i])) {
return $i;
}
}
throw new AMQPRuntimeException('No free channel ids');
}
/**
* @param int $channel
* @param int $class_id
* @param int $weight
* @param int $body_size
* @param string $packed_properties
* @param string $body
* @param AMQPWriter $pkt
* @throws AMQPIOException
*/
public function send_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt)
{
$this->prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt);
$this->write($pkt->getvalue());
}
/**
* Returns a new AMQPWriter or mutates the provided $pkt
*
* @param int $channel
* @param int $class_id
* @param int $weight
* @param int $body_size
* @param string $packed_properties
* @param string $body
* @param AMQPWriter|null $pkt
* @return AMQPWriter
*/
public function prepare_content($channel, $class_id, $weight, $body_size, $packed_properties, $body, $pkt)
{
$pkt = $pkt ?: new AMQPWriter();
// Content already prepared ?
$key_cache = sprintf(
'%s|%s|%s|%s',
$channel,
$packed_properties,
$class_id,
$weight
);
if (!isset($this->prepare_content_cache[$key_cache])) {
$w = new AMQPWriter();
$w->write_octet(2);
$w->write_short($channel);
$w->write_long(mb_strlen($packed_properties, 'ASCII') + 12);
$w->write_short($class_id);
$w->write_short($weight);
$this->prepare_content_cache[$key_cache] = $w->getvalue();
if (count($this->prepare_content_cache) > $this->prepare_content_cache_max_size) {
reset($this->prepare_content_cache);
$old_key = key($this->prepare_content_cache);
unset($this->prepare_content_cache[$old_key]);
}
}
$pkt->write($this->prepare_content_cache[$key_cache]);
$pkt->write_longlong($body_size);
$pkt->write($packed_properties);
$pkt->write_octet(0xCE);
// memory efficiency: walk the string instead of biting
// it. good for very large packets (close in size to
// memory_limit setting)
$position = 0;
$bodyLength = mb_strlen($body, 'ASCII');
while ($position < $bodyLength) {
$payload = mb_substr($body, $position, $this->frame_max - 8, 'ASCII');
$position += $this->frame_max - 8;
$pkt->write_octet(3);
$pkt->write_short($channel);
$pkt->write_long(mb_strlen($payload, 'ASCII'));
$pkt->write($payload);
$pkt->write_octet(0xCE);
}
return $pkt;
}
/**
* @param int $channel
* @param array $method_sig
* @param AMQPWriter|string $args
* @param null $pkt
* @throws AMQPIOException
*/
protected function send_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
{
$pkt = $this->prepare_channel_method_frame($channel, $method_sig, $args, $pkt);
$this->write($pkt->getvalue());
$this->debug->debug_method_signature1($method_sig);
}
/**
* Returns a new AMQPWriter or mutates the provided $pkt
*
* @param int $channel
* @param array $method_sig
* @param AMQPWriter|string $args
* @param AMQPWriter|null $pkt
* @return AMQPWriter
*/
protected function prepare_channel_method_frame($channel, $method_sig, $args = '', $pkt = null)
{
if ($args instanceof AMQPWriter) {
$args = $args->getvalue();
}
$pkt = $pkt ?: new AMQPWriter();
$pkt->write_octet(1);
$pkt->write_short($channel);
$pkt->write_long(mb_strlen($args, 'ASCII') + 4); // 4 = length of class_id and method_id
// in payload
$pkt->write_short($method_sig[0]); // class_id
$pkt->write_short($method_sig[1]); // method_id
$pkt->write($args);
$pkt->write_octet(0xCE);
$this->debug->debug_method_signature1($method_sig);
return $pkt;
}
/**
* Waits for a frame from the server
*
* @param int|float|null $timeout
* @return Frame
* @throws \Exception
* @throws \PhpAmqpLib\Exception\AMQPTimeoutException
* @throws AMQPRuntimeException
*/
protected function wait_frame($timeout = 0): Frame
{
if (null === $this->input) {
$this->setIsConnected(false);
throw new AMQPConnectionClosedException('Broken pipe or closed connection');
}
$currentTimeout = $this->input->getTimeout();
$this->input->setTimeout($timeout);
try {
$header = $this->input->readFrameHeader();
$frame_type = $header['type'];
if (!$this->constants->isFrameType($frame_type)) {
throw new AMQPInvalidFrameException('Invalid frame type ' . $frame_type);
}
$size = $header['size'];
// payload + ch
$result = unpack('a' . $size . 'payload/Cch', $this->input->read(AMQPReader::OCTET + $size));
$ch = $result['ch'];
$frame = new Frame($frame_type, $header['channel'], $size, $result['payload']);
} catch (AMQPTimeoutException $e) {
if ($this->input) {
$this->input->setTimeout($currentTimeout);
}
throw $e;
} catch (AMQPNoDataException $e) {
if ($this->input) {
$this->input->setTimeout($currentTimeout);
}
throw $e;
} catch (AMQPConnectionClosedException $exception) {
$this->do_close();
throw $exception;
} finally {
if ($this->input) {
$this->input->setTimeout($currentTimeout);
}
}
$this->input->setTimeout($currentTimeout);
if ($ch !== Frame::END) {
throw new AMQPInvalidFrameException(sprintf(
'Framing error, unexpected byte: %x',
$ch
));
}
return $frame;
}
/**
* Waits for a frame from the server destined for a particular channel.
*
* @param int $channel_id
* @param int|float|null $timeout
* @return Frame
* @throws \Exception
*/
protected function wait_channel(int $channel_id, $timeout = 0): Frame
{
// Keeping the original timeout unchanged.
$_timeout = $timeout;
while (true) {
$start = microtime(true);
try {
$frame = $this->wait_frame($_timeout);
} catch (AMQPTimeoutException $e) {
if (
$this->heartbeat && $this->last_frame
&& microtime(true) - ($this->heartbeat * 2) > $this->last_frame
) {
$this->debug->debug_msg('missed server heartbeat (at threshold * 2)');
$this->setIsConnected(false);
throw new AMQPHeartbeatMissedException('Missed server heartbeat');
}
throw $e;
}
$this->last_frame = microtime(true);
$frame_channel = $frame->getChannel();
if ($frame_channel === 0 && $frame->isHeartbeat()) {
// skip heartbeat frames and reduce the timeout by the time passed
$this->debug->debug_msg('received server heartbeat');
if ($_timeout > 0) {
$_timeout -= $this->last_frame - $start;
if ($_timeout <= 0) {
// If timeout has been reached, throw the exception without calling wait_frame
throw new AMQPTimeoutException('Timeout waiting on channel');
}
}
continue;
}
if ($frame_channel === $channel_id) {
return $frame;
}
// Not the channel we were looking for. Queue this frame
//for later, when the other channel is looking for frames.
// Make sure the channel still exists, it could have been
// closed by a previous Exception.
if (isset($this->channels[$frame_channel])) {
$this->channels[$frame_channel]->frame_queue->enqueue($frame);
}
// If we just queued up a method for channel 0 (the Connection
// itself) it's probably a close method in reaction to some
// error, so deal with it right away.
if ($frame_channel === 0 && $frame->isMethod()) {
$this->wait();
}
}
}
/**
* Fetches a channel object identified by the numeric channel_id, or
* create that object if it doesn't already exist.
*
* @param int|null $channel_id
* @return AMQPChannel
* @throws \PhpAmqpLib\Exception\AMQPOutOfBoundsException
* @throws \PhpAmqpLib\Exception\AMQPRuntimeException
* @throws \PhpAmqpLib\Exception\AMQPTimeoutException
* @throws \PhpAmqpLib\Exception\AMQPConnectionClosedException
*/
public function channel($channel_id = null)
{
if (!$this->is_connected) {
$this->connect();
}
if (isset($this->channels[$channel_id])) {
return $this->channels[$channel_id];
}
$channel_id = $channel_id ?: $this->get_free_channel_id();
$ch = new AMQPChannel($this, $channel_id, true, $this->channel_rpc_timeout);
$this->channels[$channel_id] = $ch;
return $ch;
}
/**
* Requests a connection close
*
* @param int $reply_code
* @param string $reply_text
* @param array $method_sig
* @return mixed|null
* @throws \Exception
*/
public function close($reply_code = 0, $reply_text = '', $method_sig = array(0, 0))
{
$this->io->disableHeartbeat();
if (empty($this->protocolWriter) || !$this->isConnected()) {
return null;
}
$result = null;
try {
$this->closeChannels();
list($class_id, $method_id, $args) = $this->protocolWriter->connectionClose(
$reply_code,
$reply_text,
$method_sig[0],
$method_sig[1]
);
$this->send_method_frame(array($class_id, $method_id), $args);
$result = $this->wait(
array($this->waitHelper->get_wait('connection.close_ok')),
false,
$this->connection_timeout
);
} catch (\Exception $exception) {
$this->do_close();
throw $exception;
}
$this->setIsConnected(false);
return $result;
}
/**
* @param AMQPReader $reader
* @throws AMQPConnectionClosedException
*/
protected function connection_close(AMQPReader $reader)
{
$code = (int)$reader->read_short();
$reason = $reader->read_shortstr();
$class = $reader->read_short();
$method = $reader->read_short();
$reason .= sprintf('(%s, %s)', $class, $method);
$this->x_close_ok();
throw new AMQPConnectionClosedException($reason, $code);
}
/**
* Confirms a connection close
*/
protected function x_close_ok()
{
$this->send_method_frame(
explode(',', $this->waitHelper->get_wait('connection.close_ok'))
);
$this->do_close();
}
/**
* Confirm a connection close
*/
protected function connection_close_ok()
{
$this->do_close();
}
/**
* @param string $virtual_host
* @param string $capabilities
* @param bool $insist
* @return mixed
*/
protected function x_open($virtual_host, $capabilities = '', $insist = false)
{
$args = new AMQPWriter();
$args->write_shortstr($virtual_host);
$args->write_shortstr($capabilities);
$args->write_bits(array($insist));
$this->send_method_frame(array(10, 40), $args);
$wait = array(
$this->waitHelper->get_wait('connection.open_ok')
);
if ($this->protocolVersion === Wire\Constants080::VERSION) {
$wait[] = $this->waitHelper->get_wait('connection.redirect');
}
return $this->wait($wait, false, $this->connection_timeout);
}
/**
* Signals that the connection is ready
*
* @param AMQPReader $args
*/
protected function connection_open_ok($args)
{
$this->known_hosts = $args->read_shortstr();
$this->debug->debug_msg('Open OK! known_hosts: ' . $this->known_hosts);
}
/**
* Asks the client to use a different server
*
* @param AMQPReader $args
* @return string
*/
protected function connection_redirect($args)
{
$host = $args->read_shortstr();
$this->known_hosts = $args->read_shortstr();
$this->debug->debug_msg(sprintf(
'Redirected to [%s], known_hosts [%s]',
$host,
$this->known_hosts
));
return $host;
}
/**
* Security mechanism challenge
*
* @param AMQPReader $args
*/
protected function connection_secure($args)
{
$args->read_longstr();
}
/**
* Security mechanism response
*
* @param string $response
*/
protected function x_secure_ok($response)
{
$args = new AMQPWriter();
$args->write_longstr($response);
$this->send_method_frame(array(10, 21), $args);
}
/**
* Starts connection negotiation
*
* @param AMQPReader $args
*/
protected function connection_start($args)
{
$this->version_major = $args->read_octet();
$this->version_minor = $args->read_octet();
$this->server_properties = $args->read_table();
$this->mechanisms = explode(' ', $args->read_longstr());
$this->locales = explode(' ', $args->read_longstr());
$this->debug->debug_connection_start(
$this->version_major,
$this->version_minor,
$this->server_properties,
$this->mechanisms,
$this->locales
);
}
/**
* @param AMQPTable|array $clientProperties
* @param string $mechanism
* @param string $response
* @param string $locale
*/
protected function x_start_ok($clientProperties, $mechanism, $response, $locale)
{
$args = new AMQPWriter();
$args->write_table($clientProperties);
$args->write_shortstr($mechanism);
$args->write_longstr($response);
$args->write_shortstr($locale);
$this->send_method_frame(array(10, 11), $args);
}
/**
* Proposes connection tuning parameters
*
* @param AMQPReader $args
*/
protected function connection_tune($args)
{
$v = $args->read_short();
if ($v) {
$this->channel_max = $v;
}
$v = $args->read_long();
if ($v) {
$this->frame_max = (int)$v;
}
// @see https://www.rabbitmq.com/heartbeats.html
// If either value is 0 (see below), the greater value of the two is used
// Otherwise the smaller value of the two is used
// A zero value indicates that a peer suggests disabling heartbeats entirely.
// To disable heartbeats, both peers have to opt in and use the value of 0
// For BC, this library opts for disabled heartbeat if client value is 0.
$v = $args->read_short();
if ($this->heartbeat > 0) {
$this->heartbeat = min($this->heartbeat, $v);
}
$this->x_tune_ok($this->channel_max, $this->frame_max, $this->heartbeat);
$this->io->afterTune($this->heartbeat);
}
/**
* Negotiates connection tuning parameters
*
* @param int $channel_max
* @param int $frame_max
* @param int $heartbeat
*/
protected function x_tune_ok($channel_max, $frame_max, $heartbeat)
{
$args = new AMQPWriter();
$args->write_short($channel_max);
$args->write_long($frame_max);
$args->write_short($heartbeat);
$this->send_method_frame(array(10, 31), $args);
$this->wait_tune_ok = false;
}
/**
* @return AbstractIO
* @deprecated
*/
public function getIO()
{
return $this->io;
}
/**
* Check connection heartbeat if enabled.
* @throws AMQPHeartbeatMissedException If too much time passed since last connection activity.
* @throws AMQPConnectionClosedException If connection was closed due to network issues or timeouts.
* @throws AMQPSocketException If connection was already closed.
* @throws AMQPTimeoutException If heartbeat write takes too much time.
* @throws AMQPIOException If other connection problems occurred.
*/
public function checkHeartBeat()
{
$this->io->check_heartbeat();
}
/**
* @return float|int
*/
public function getLastActivity()
{
return $this->io->getLastActivity();
}
/**
* @return float
* @since 3.2.0
*/
public function getReadTimeout(): float
{
return $this->io->getReadTimeout();
}
/**
* Handles connection blocked notifications
*
* @param AMQPReader $args
*/
protected function connection_blocked(AMQPReader $args)
{
$this->blocked = true;
// Call the block handler and pass in the reason
$this->dispatch_to_handler($this->connection_block_handler, array($args->read_shortstr()));
}
/**
* Handles connection unblocked notifications
*/
protected function connection_unblocked()
{
$this->blocked = false;
// No args to an unblock event
$this->dispatch_to_handler($this->connection_unblock_handler);
}
/**
* Sets a handler which is called whenever a connection.block is sent from the server
*
* @param callable $callback
* @throws \InvalidArgumentException if $callback is not callable
*/
public function set_connection_block_handler($callback)
{
Assert::isCallable($callback);
$this->connection_block_handler = $callback;
}
/**
* Sets a handler which is called whenever a connection.block is sent from the server
*
* @param callable $callback
* @throws \InvalidArgumentException if $callback is not callable
*/
public function set_connection_unblock_handler($callback)
{
Assert::isCallable($callback);
$this->connection_unblock_handler = $callback;
}
/**
* Gets the connection status
*
* @return bool
*/
public function isConnected()
{
return $this->is_connected;
}
/**
* Get the connection blocked state.
* @return bool
* @since 2.12.0
*/
public function isBlocked()
{
return $this->blocked;
}
/**
* Get the io writing state.
* @return bool
*/
public function isWriting()
{
return $this->writing;
}
/**
* Set the connection status
*
* @param bool $is_connected
*/
protected function setIsConnected($is_connected)
{
$this->is_connected = (bool) $is_connected;
}
/**
* Closes all available channels
*/
protected function closeChannels()
{
foreach ($this->channels as $key => $channel) {
// channels[0] is this connection object, so don't close it yet
if ($key === 0) {
continue;
}
try {
$channel->close();
} catch (\Exception $e) {
/* Ignore closing errors */
}
}
}
/**
* Should the connection be attempted during construction?
*
* @return bool
*/
public function connectOnConstruct(): bool
{
if ($this->config) {
return !$this->config->isLazy();
}
return true;
}
/**
* @return array
*/
public function getServerProperties()
{
return $this->server_properties;
}
/**
* @return int
*/
public function getHeartbeat()
{
return $this->heartbeat;
}
/**
* Get the library properties for populating the client protocol information
*
* @return array
*/
public function getLibraryProperties()
{
$config = self::$LIBRARY_PROPERTIES;
if ($this->config !== null) {
$connectionName = $this->config->getConnectionName();
if ($connectionName !== '') {
$config['connection_name'] = ['S', $connectionName];
}
}
return $config;
}
/**
* @param array $hosts
* @param array $options
*
* @return mixed
* @throws \Exception
* @deprecated Use AMQPConnectionFactory.
*/
public static function create_connection($hosts, $options = array())
{
if (!is_array($hosts) || count($hosts) < 1) {
throw new \InvalidArgumentException(
'An array of hosts are required when attempting to create a connection'
);
}
foreach ($hosts as $hostdef) {
self::validate_host($hostdef);
$host = $hostdef['host'];
$port = $hostdef['port'];
$user = $hostdef['user'];
$password = $hostdef['password'];
$vhost = isset($hostdef['vhost']) ? $hostdef['vhost'] : '/';
try {
$conn = static::try_create_connection($host, $port, $user, $password, $vhost, $options);
return $conn;
} catch (\Exception $e) {
$latest_exception = $e;
}
}
throw $latest_exception;
}
public static function validate_host($host)
{
if (!isset($host['host'])) {
throw new \InvalidArgumentException("'host' key is required.");
}
if (!isset($host['port'])) {
throw new \InvalidArgumentException("'port' key is required.");
}
if (!isset($host['user'])) {
throw new \InvalidArgumentException("'user' key is required.");
}
if (!isset($host['password'])) {
throw new \InvalidArgumentException("'password' key is required.");
}
}
}
<?php
namespace PhpAmqpLib\Connection\Heartbeat;
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Exception\AMQPRuntimeException;
/**
* Manages pcntl-based heartbeat sending for a {@link AbstractConnection}.
*/
abstract class AbstractSignalHeartbeatSender
{
/**
* @var AbstractConnection|null
*/
protected $connection;
/**
* @param AbstractConnection $connection
* @throws AMQPRuntimeException
*/
public function __construct(AbstractConnection $connection)
{
if (!$this->isSupported()) {
throw new AMQPRuntimeException('Signal-based heartbeat sender is unsupported');
}
$this->connection = $connection;
}
public function __destruct()
{
$this->unregister();
}
/**
* @return bool
*/
protected function isSupported(): bool
{
return extension_loaded('pcntl')
&& function_exists('pcntl_async_signals')
&& (defined('AMQP_WITHOUT_SIGNALS') ? !AMQP_WITHOUT_SIGNALS : true);
}
/**
* Starts the heartbeats
*/
abstract public function register(): void;
/**
* Stops the heartbeats.
*/
abstract public function unregister(): void;
/**
* Handles the heartbeat when a signal interrupt is received
*
* @param int $interval
*/
protected function handleSignal(int $interval): void
{
if (!$this->connection) {
return;
}
if (!$this->connection->isConnected()) {
$this->unregister();
return;
}
if ($this->connection->isWriting()) {
return;
}
if (time() > ($this->connection->getLastActivity() + $interval)) {
$this->connection->checkHeartBeat();
}
}
}
<?php
namespace PhpAmqpLib\Connection\Heartbeat;
use PhpAmqpLib\Exception\AMQPRuntimeException;
/**
* @see AbstractSignalHeartbeatSender
*
* This version of a signal based heartbeat sendler relies on using SIGALRM and uses the OS to trigger an alarm
* after a given time.
*/
final class PCNTLHeartbeatSender extends AbstractSignalHeartbeatSender
{
public function register(): void
{
if (!$this->connection) {
throw new AMQPRuntimeException('Unable to re-register heartbeat sender');
}
if (!$this->connection->isConnected()) {
throw new AMQPRuntimeException('Unable to register heartbeat sender, connection is not active');
}
$timeout = $this->connection->getHeartbeat();
if ($timeout > 0) {
$interval = (int)ceil($timeout / 2);
pcntl_async_signals(true);
$this->registerListener($interval);
pcntl_alarm($interval);
}
}
public function unregister(): void
{
$this->connection = null;
// restore default signal handler
pcntl_signal(SIGALRM, SIG_IGN);
}
private function registerListener(int $interval): void
{
pcntl_signal(SIGALRM, function () use ($interval) {
$this->handleSignal($interval);
if ($this->connection) {
pcntl_alarm($interval);
}
}, true);
}
}
<?php
namespace PhpAmqpLib\Connection\Heartbeat;
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Exception\AMQPRuntimeException;
/**
* @see AbstractSignalHeartbeatSender
* @since 3.2.0
*
* This version of a signal based heartbeat sender allows using any signal number. It forks the current process
* to create a child process that periodically sends a signal to the parent process.
* The default signal used is SIGUSR1
*/
final class SIGHeartbeatSender extends AbstractSignalHeartbeatSender
{
/**
* @var int the UNIX signal to be used for managing heartbeats
*/
private $signal;
/**
* @var int|null the PID (process ID) of the child process sending regular signals to manage heartbeats
*/
private $childPid;
/**
* @param AbstractConnection $connection
* @param int $signal
* @throws AMQPRuntimeException
*/
public function __construct(AbstractConnection $connection, int $signal = SIGUSR1)
{
parent::__construct($connection);
$this->signal = $signal;
}
public function register(): void
{
if (!$this->connection) {
throw new AMQPRuntimeException('Unable to re-register heartbeat sender');
}
if (!$this->connection->isConnected()) {
throw new AMQPRuntimeException('Unable to register heartbeat sender, connection is not active');
}
$timeout = $this->connection->getHeartbeat();
if ($timeout > 0) {
$interval = (int)ceil($timeout / 2);
$this->registerListener($interval);
}
}
public function unregister(): void
{
$this->connection = null;
// restore default signal handler
pcntl_signal($this->signal, SIG_IGN);
if ($this->childPid > 0) {
posix_kill($this->childPid, SIGKILL);
}
$this->childPid = null;
}
private function registerListener(int $interval): void
{
pcntl_async_signals(true);
$this->periodicAlarm($interval);
pcntl_signal($this->signal, function () use ($interval) {
$this->handleSignal($interval);
});
}
/**
* Forks the current process to create a child process that will send periodic signals to the parent
*
* @param int $interval
*/
private function periodicAlarm(int $interval): void
{
$parent = getmypid();
$pid = pcntl_fork();
if(!$pid) {
while (true){
sleep($interval);
posix_kill($parent, $this->signal);
}
} else {
$this->childPid = $pid;
}
}
}
<?php
namespace PhpAmqpLib\Exception;
class AMQPBasicCancelException extends \Exception implements AMQPExceptionInterface
{
/**
* @var string
* @internal Use getter getConsumerTag()
*/
public $consumerTag;
/**
* @param string $consumerTag
*/
public function __construct($consumerTag)
{
parent::__construct('Channel was canceled');
$this->consumerTag = $consumerTag;
}
/**
* @return string
*/
public function getConsumerTag()
{
return $this->consumerTag;
}
}
<?php
namespace PhpAmqpLib\Exception;
class AMQPChannelClosedException extends AMQPRuntimeException
{
}
<?php
namespace PhpAmqpLib\Exception;
class AMQPConnectionBlockedException extends AMQPRuntimeException
{
public function __construct($message = '', $code = 0, $previous = null)
{
if (empty($message)) {
$message = 'Connection is blocked due to low resources';
}
parent::__construct($message, $code, $previous);
}
}
<?php
namespace PhpAmqpLib\Exception;
/**
* When connection was closed by server, proxy or some tunnel due to timeout or network issue.
*/
class AMQPConnectionClosedException extends AMQPRuntimeException
{
}
<?php
namespace PhpAmqpLib\Exception;
class AMQPDataReadException extends AMQPRuntimeException
{
}
<?php
namespace PhpAmqpLib\Exception;
class AMQPEmptyDeliveryTagException extends AMQPRuntimeException
{
}
<?php
namespace PhpAmqpLib\Exception;
interface AMQPExceptionInterface
{
}
<?php
namespace PhpAmqpLib\Exception;
class AMQPHeartbeatMissedException extends AMQPConnectionClosedException
{
}
<?php
namespace PhpAmqpLib\Exception;
class AMQPIOException extends \Exception implements AMQPExceptionInterface
{
}
<?php
namespace PhpAmqpLib\Exception;
class AMQPIOWaitException extends AMQPRuntimeException
{
}
<?php
namespace PhpAmqpLib\Exception;
class AMQPInvalidArgumentException extends \RuntimeException implements AMQPExceptionInterface
{
}