This topic describes how to use the Message Queue for Apache RocketMQ SDK for Java to subscribe to messages.

Subscription modes

Message Queue for Apache RocketMQ supports the following subscription modes:

  • Clustering subscription

    All consumers with the same group ID consume an equal number of messages. For example, assume a topic contains nine messages and a group contains three consumer instances. In this case, each instance consumes three messages. Example:

    // The configuration of clustering subscription (default mode).
    properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
  • Broadcasting subscription

    All consumers with the same group ID consume all messages once. For example, assume a topic contains nine messages and a group contains three consumer instances. In this case, each instance consumes nine messages. Example:

    // The configuration of broadcasting subscription.
    properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);               
Note
  • Ensure that all consumer instances with the same group ID use the same subscription mode. For more information, see Subscription consistency.
  • The two subscription modes have different functional limits. For example, the broadcasting consumption mode does not support ordered messages, consumption progress maintenance, or consumer offset resetting. For more information, see Clustering consumption and broadcasting consumption.

Message obtaining modes

Message Queue for Apache RocketMQ supports the following two modes for obtaining messages:

  • Push: Push consumers receive messages pushed to them from Message Queue for Apache RocketMQ.
  • Pull: Pull consumers pull messages from Message Queue for Apache RocketMQ.

Pull consumers provide more options to subscribe to messages and give you more control over message pulling. For more information, see Operations and parameters.

Notice Message Queue for Apache RocketMQ Enterprise Platinum Edition is required to use pull consumers.

Sample code

For more information about the sample code, see Message Queue for Apache RocketMQ code library. Sample code of the push and pull modes:

  • Push
    import com.aliyun.openservices.ons.api.Action;
    import com.aliyun.openservices.ons.api.ConsumeContext;
    import com.aliyun.openservices.ons.api.Consumer;
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.MessageListener;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    
    import java.util.Properties;
    
    public class ConsumerTest {
       public static void main(String[] args) {
           Properties properties = new Properties();
            // The group ID that you created in the console.
           properties.put(PropertyKeyConst.GROUP_ID, "XXX");
            // The AccessKey ID that you created in the Alibaba Cloud RAM console for verifying the Alibaba Cloud account.
           properties.put(PropertyKeyConst.AccessKey, "XXX");
            // The AccessKey secret that you created in the Alibaba Cloud RAM console for verifying the Alibaba Cloud account.
           properties.put(PropertyKeyConst.SecretKey, "XXX");
            // The TCP endpoint. You can find the endpoint in the TCP Client Endpoint section on the Instance Details page in the Message Queue for Apache RocketMQ console.
           properties.put(PropertyKeyConst.NAMESRV_ADDR, "XXX");
              // Clustering subscription (default).
              // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
              // Broadcasting subscription.
              // properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.BROADCASTING);
    
           Consumer consumer = ONSFactory.createConsumer(properties);
           consumer.subscribe("TopicTestMQ", "TagA||TagB", new MessageListener() { // Subscribe to multiple tags.
               public Action consume(Message message, ConsumeContext context) {
                   System.out.println("Receive: " + message);
                   return Action.CommitMessage;
               }
           });
    
            // Subscribe to another topic. To unsubscribe from this topic, delete the subscription code and restart the consumer.
            consumer.subscribe("TopicTestMQ-Other", "*", new MessageListener() { // Subscribe to all tags.
               public Action consume(Message message, ConsumeContext context) {
                   System.out.println("Receive: " + message);
                   return Action.CommitMessage;
               }
           });
    
           consumer.start();
           System.out.println("Consumer Started");
       }
    }            
  • Pull
    import com.aliyun.openservices.ons.api.Message;
    import com.aliyun.openservices.ons.api.ONSFactory;
    import com.aliyun.openservices.ons.api.PropertyKeyConst;
    import com.aliyun.openservices.ons.api.PullConsumer;
    import com.aliyun.openservices.ons.api.TopicPartition;
    import java.util.List;
    import java.util.Properties;
    import java.util.Set;
    
    public class PullConsumerClient {
        public static void main(String[] args){
            Properties properties = new Properties();
            properties.setProperty(PropertyKeyConst.GROUP_ID, "GID-xxxxx");
            // The AccessKey ID that you created in the Alibaba Cloud RAM console for verifying the Alibaba Cloud account.
            properties.put(PropertyKeyConst.AccessKey, "xxxxxxx");
            // The AccessKey secret that you created in the Alibaba Cloud RAM console for verifying the Alibaba Cloud account.
            properties.put(PropertyKeyConst.SecretKey, "xxxxxxx");
            // The TCP endpoint. You can find the endpoint in the TCP Client Endpoint section on the Instance Details page in the Message Queue for Apache RocketMQ console.
            properties.put(PropertyKeyConst.NAMESRV_ADDR, "xxxxx");
            PullConsumer consumer = ONSFactory.createPullConsumer(properties);
            // Start the consumer.
            consumer.start();
            // Query all partitions of topic-xxx.
            Set<TopicPartition> topicPartitions = consumer.topicPartitions("topic-xxx");
            // Specify a partition from which you want to pull messages.
            consumer.assign(topicPartitions);
    
            while (true) {
                // Pull messages. Timeout period: 3,000 ms
                List<Message> messages = consumer.poll(3000);
                System.out.printf("Received message: %s %n", messages);
            }
        }
    }

    For more information about partitions and offsets, see Terms.

More information

For the best practices for consumer throttling in Message Queue for Apache RocketMQ, see RocketMQ client traffic control design.