Publish and consume with RabbitMQ and PHP

Posted on April 8, 2014, 4:35 pm by about-dev.com


Meet RabbitMQ, an excelent messaging broker that helps you decouple application that need to work toghether. This post presents the steps you have to take to use RabbitMQ with the PHP AMQP library. 

For an easy understanding of this blog post you can read about RabbitMQ here and about the PHP AMQP library here.

To use the code from this post, you have to copy and paste the content into the indicated spots, install RabbitMQ on your devel machine, configure your application to publish messages onto an exchange and to consume messages from a queue. At the end of this article I posted a link to the consumer si publisher files from Github.

Let's start with the steps you have to take to use the PHP AMQP extension to publish messages and to consume messages:

Step 0: install RabbitMQ (see how to install RabbitMQ here)

Step 1: add the AMQP extension to PHP (you can download the proper DLL from here)

Step 2: create an exchange (this is the place where you should publish your messages) using the RabbitMQ easy to use interface (I named my exchange "mine.test")

Step 3: bind the exchange created above to a queue which will consume your messages (for test purpose you can create a queue and bind your exchange to this queue;); you should choose one of the RabbitMQ binding mechanism (I'll use "topic")

Step 4: create the code for publishing to the exchange - publisher.php

// it produces messages ==> publish them on an exchange, the "mine.test" exchange
try{
 //establish the connection to an AMQP server
 $amqpConn = new AMQPConnection();
 $amqpConn->connect();

 if(!$amqpConn->isConnected()){
  die('Conexiune esuata!');
 }

 //creates the channel of comunication - mandatory !!!
 $channel = new AMQPChannel($amqpConn);
 if(!$channel->isConnected()){
  die('Connection through channel failed!');
 }

 //sets the exchange
 $exchangeName= 'mine.test';
 $exchange    = new AMQPExchange($channel);
 $exchange->setName($exchangeName);
 
 //sets the queue
 $queueName  = 'mine.test';
 $queue      = new AMQPQueue($channel);
 $queue->setName($queueName);
 $routingKey  = '';

 //publish the message
 $message = "Hello world!";
 if($exchange->publish($message, $routingKey)){
  echo 'Published!';
 }
}catch(AMQPException $e){
 echo 'AMQP Exception - '.$e->getMessage();
}catch(AMQPConnectionException $e){
 echo 'AMQP Connection Exception - '.$e->getMessage();
}catch(AMQPExchangeException $e){
 echo 'AMQP Exchange Exception - '.$e->getMessage();
}catch(AMQPQueueException  $e){
 echo 'AMQP Queue Exception - '.$e->getMessage();
}

Step 5: create the code that for consuming messages from a queue (my queue is named "mine.test") - publisher.php

//consumes messages from a queue, the "mine.test" queue using a routing key
try{
 //establish the connection to an AMQP server
 $amqpConn = new AMQPConnection();
 $amqpConn->connect();

 if(!$amqpConn->isConnected()){
  die('Failed to connect!');
 }

 //creates the channel of comunication - mandatory !!!
 $channel = new AMQPChannel($amqpConn);
 if(!$channel->isConnected()){
  die('Connection through channel failed!');
 }

 //sets the exchange
 $exchangeName= 'mine.test';
 $exchange    = new AMQPExchange($channel);
 $exchange->setName($exchangeName);

 //sets the queue
 $queueName= 'mine.test';
 $queue    = new AMQPQueue($channel);
 $queue->setName($queueName);

 $counter = 0;
 while($envelope = $queue->get()){
  //get message payload
  $message = $envelope->getBody();
  if($message){
   echo $message.'';
   //inform the queue that the message was acknowledged 
   $queue->ack($envelope->getDeliveryTag());
  }else{
   $queue->nack($envelope->getDeliveryTag(), AMQP_REQUEUE);
  }

  $counter++;
 }

 if($counter){
  echo 'Consuming...';
 }else{
  echo 'No messages to consume...';
 }
}catch(AMQPException $e){
 echo 'AMQP Exception - '.$e->getMessage();
}catch(AMQPConnectionException $e){
 echo 'AMQP Connection Exception - '.$e->getMessage();
}catch(AMQPExchangeException $e){
 echo 'AMQP Exchange Exception - '.$e->getMessage();
}catch(AMQPQueueException  $e){
 echo 'AMQP Queue Exception - '.$e->getMessage();
}

Step 6: test the structure

  • open the CLI and type:
>>> php pusblisher.php

 (this should publish a message on the exchange and you should see it in the RabbitMQ interface)

  •  then type:
>>>php consumer.php

 (this should get the the message from the queue and consume it and you should see in the RabbitMQ interface that the message published will dissapear)

Download source files: Download source code and installation instructions


Leave a Comment:

User
Email
Website

Blog Search

Popular Blog Categories

Newsletter

Want to be informed about latest posts? Subscribe to our newsletter