If you use consumer groups to consume log data, you do not need to consider factors such as Log Service implementation, load balancing among consumers, and failovers that may occur. This allows you to focus on the business logic during log data consumption.

Terms

Term Description
consumer group A consumer group consists of multiple consumers. All consumers in a consumer group consume data in a Logstore and do not repeatedly consume data. You can create a maximum of 30 consumer groups for a Logstore.
consumer The consumers in a consumer group consume data. The names of the consumers in a consumer group must be unique.
A Logstore has multiple shards. A consumer library allocates shards to the consumers in a consumer group based on the following rules:
  • Each shard can be allocated to only one consumer.
  • Each consumer can consume data from multiple shards.

After a new consumer is added to a consumer group, shards allocated to the consumer group are reallocated to each consumer for load balancing. The shards are reallocated based on the preceding rules.

A consumer library stores checkpoints. This way, consumers can resume data consumption from a checkpoint and do not repeatedly consume data after a program fault is resolved.

Procedure

You can use SDK for Java, Python, or Go to create consumer groups to consume data. In the following example, Java is used.

  1. Add Maven dependencies.
    <dependency>
      <groupId>com.google.protobuf</groupId>
      <artifactId>protobuf-java</artifactId>
      <version>2.5.0</version>
    </dependency>
    <dependency>
      <groupId>com.aliyun.openservices</groupId>
      <artifactId>loghub-client-lib</artifactId>
      <version>0.6.33</version>
    </dependency>
  2. Create a Main.Java file.
    import com.aliyun.openservices.loghub.client.ClientWorker;
    import com.aliyun.openservices.loghub.client.config.LogHubConfig;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubClientWorkerException;
    
    public class Main {
        // The endpoint of Log Service. For more information, see Endpoints. 
        private static String sEndpoint = "cn-hangzhou.log.aliyuncs.com";
        // The name of the Log Service project. Enter the name based on your business requirements. 
        private static String sProject = "ali-cn-hangzhou-sls-admin";
        // The name of the Logstore. Enter the name based on your business requirements. 
        private static String sLogstore = "sls_operation_log";
        // The name of a consumer group. Enter the name based on your business requirements. 
        private static String sConsumerGroup = "consumerGroupX";
        // The AccessKey ID and AccessKey secret of the user that consumes data. Enter the AccessKey ID and AccessKey secret based on your business requirements. For more information, see AccessKey pair. 
        private static String sAccessKeyId = "";
        private static String sAccessKey = "";
    
        public static void main(String[] args) throws LogHubClientWorkerException, InterruptedException {
            // consumer_1 is the consumer name. The name of each consumer in a consumer group must be unique. If different consumers start processes on different machines to consume data in a Logstore, you can use the corresponding machine IP address to identify each consumer. The maxFetchLogGroupSize parameter specifies the maximum number of log groups that are retrieved from the server at a time. Valid values: (0,1000]. We recommend that you use the default value. 
            LogHubConfig config = new LogHubConfig(sConsumerGroup, "consumer_1", sEndpoint, sProject, sLogstore, sAccessKeyId, sAccessKey, LogHubConfig.ConsumePosition.BEGIN_CURSOR);
            ClientWorker worker = new ClientWorker(new SampleLogHubProcessorFactory(), config);
            Thread thread = new Thread(worker);
            // After the thread is executed, the ClientWorker instance automatically runs and implements the Runnable interface. 
            thread.start();
            Thread.sleep(60 * 60 * 1000);
            // The shutdown function of the ClientWorker instance is called to stop the consumption instance. The associated thread is automatically stopped. 
            worker.shutdown();
            // Multiple asynchronous tasks are generated when the ClientWorker instance is running. To ensure that all running tasks stop after shutdown, we recommend that you set Thread.sleep to 30 seconds. 
            Thread.sleep(30 * 1000);
        }
    }
  3. Create a SampleLogHubProcessor.java file.
    import com.aliyun.openservices.log.common.FastLog;
    import com.aliyun.openservices.log.common.FastLogContent;
    import com.aliyun.openservices.log.common.FastLogGroup;
    import com.aliyun.openservices.log.common.FastLogTag;
    import com.aliyun.openservices.log.common.LogGroupData;
    import com.aliyun.openservices.loghub.client.ILogHubCheckPointTracker;
    import com.aliyun.openservices.loghub.client.exceptions.LogHubCheckPointException;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessor;
    import com.aliyun.openservices.loghub.client.interfaces.ILogHubProcessorFactory;
    
    import java.util.List;
    
    public class SampleLogHubProcessor implements ILogHubProcessor {
        private int shardId;
        // The time when a checkpoint was last saved. 
        private long mLastCheckTime = 0;
    
        public void initialize(int shardId) {
            this.shardId = shardId;
        }
    
        // The main logic of data consumption. You must include the code to handle all exceptions that may occur during data consumption. 
        public String process(List<LogGroupData> logGroups,
                              ILogHubCheckPointTracker checkPointTracker) {
            // Display the obtained data. 
            for (LogGroupData logGroup : logGroups) {
                FastLogGroup flg = logGroup.GetFastLogGroup();
                System.out.println(String.format("\tcategory\t:\t%s\n\tsource\t:\t%s\n\ttopic\t:\t%s\n\tmachineUUID\t:\t%s",
                        flg.getCategory(), flg.getSource(), flg.getTopic(), flg.getMachineUUID()));
                System.out.println("Tags");
                for (int tagIdx = 0; tagIdx < flg.getLogTagsCount(); ++tagIdx) {
                    FastLogTag logtag = flg.getLogTags(tagIdx);
                    System.out.println(String.format("\t%s\t:\t%s", logtag.getKey(), logtag.getValue()));
                }
                for (int lIdx = 0; lIdx < flg.getLogsCount(); ++lIdx) {
                    FastLog log = flg.getLogs(lIdx);
                    System.out.println("--------\nLog: " + lIdx + ", time: " + log.getTime() + ", GetContentCount: " + log.getContentsCount());
                    for (int cIdx = 0; cIdx < log.getContentsCount(); ++cIdx) {
                        FastLogContent content = log.getContents(cIdx);
                        System.out.println(content.getKey() + "\t:\t" + content.getValue());
                    }
                }
            }
            long curTime = System.currentTimeMillis();
            // Write a checkpoint to the server every 30 seconds. If the ClientWorker instance unexpectedly stops within 30 seconds, a newly started ClientWorker instance obtains consumption data from the last checkpoint. A small amount of duplicate data may exist. 
            if (curTime - mLastCheckTime > 30 * 1000) {
                try {
                    // If the parameter is set to true, checkpoints are immediately synchronized to the server. If the parameter is set to false, checkpoints are cached on your computer. By default, checkpoints are synchronized to the server at an interval of 60 seconds. 
                    checkPointTracker.saveCheckPoint(true);
                } catch (LogHubCheckPointException e) {
                    e.printStackTrace();
                }
                mLastCheckTime = curTime;
            }
            return null;
        }
    
        // The ClientWorker instance calls this function when the instance stops. You can delete the checkpoints. 
        public void shutdown(ILogHubCheckPointTracker checkPointTracker) {
            // Save checkpoints to the server. 
            try {
                checkPointTracker.saveCheckPoint(true);
            } catch (LogHubCheckPointException e) {
                e.printStackTrace();
            }
        }
    }
    
    class SampleLogHubProcessorFactory implements ILogHubProcessorFactory {
        public ILogHubProcessor generatorProcessor() {
            // Generate a consumption instance. 
            return new SampleLogHubProcessor();
        }
    }
    For more information, see Java, Python, and Go.

View the status of a consumer group

  1. Log on to the Log Service console.
  2. In the Projects section, click the name of the project that you want to view.
  3. Click the Show icon icon next to the Logstore that you want to view and click Data Consumption.
  4. Click a consumer group to view the data consumption progress of each shard.
You can also call the API of Log Service or use an SDK to view the consumption progress. Log Service SDK for Java is used in the following example:
package test;
import java.util.ArrayList;
import com.aliyun.openservices.log.Client;
import com.aliyun.openservices.log.common.Consts.CursorMode;
import com.aliyun.openservices.log.common.ConsumerGroup;
import com.aliyun.openservices.log.common.ConsumerGroupShardCheckPoint;
import com.aliyun.openservices.log.exception.LogException;
public class ConsumerGroupTest {
    static String endpoint = "";
    static String project = "";
    static String logstore = "";
    static String accesskeyId = "";
    static String accesskey = "";
    public static void main(String[] args) throws LogException {
        Client client = new Client(endpoint, accesskeyId, accesskey);
        // Obtain all consumer groups created for the Logstore. If no consumer groups exist, an empty string is returned. 
        List<ConsumerGroup> consumerGroups = client.ListConsumerGroup(project, logstore).GetConsumerGroups();
        for(ConsumerGroup c: consumerGroups){
            // Display the properties of consumer groups, including the name, heartbeat timeout, and whether the consumer group consumes data in order. 
            System.out.println("Name: " + c.getConsumerGroupName());
            System.out.println("Heartbeat timeout: " + c.getTimeout());
            System.out.println("Consumption in order: " + c.isInOrder());
            for(ConsumerGroupShardCheckPoint cp: client.GetCheckPoint(project, logstore, c.getConsumerGroupName()).GetCheckPoints()){
                System.out.println("shard: " + cp.getShard());
                The consumption time. The time is a long integer and is accurate to milliseconds. 
                System.out.println("The time when data was last consumed: " + cp.getUpdateTime());
                System.out.println("Consumer name: " + cp.getConsumer());
                String consumerPrg = "";
                if(cp.getCheckPoint().isEmpty())
                    consumerPrg = "Consumption not started";
                else{
                    //The UNIX timestamp. Unit: seconds. Format the output value of the timestamp. 
                    try{
                        int prg = client.GetPrevCursorTime(project, logstore, cp.getShard(), cp.getCheckPoint()).GetCursorTime();
                        consumerPrg = "" + prg;
                    }
                    catch(LogException e){
                        if(e.GetErrorCode() == "InvalidCursor")
                            consumerPrg = "Invalid. The point in time when data was last consumed is out of the retention period of the data.";
                        else{
                            //internal server error
                            throw e;
                        }
                    }
                }
                System.out.println("Consumption progress: " + consumerPrg);
                String endCursor = client.GetCursor(project, logstore, cp.getShard(), CursorMode.END).GetCursor();
                int endPrg = 0;
                try{
                    endPrg = client.GetPrevCursorTime(project, logstore, cp.getShard(), endCursor).GetCursorTime();
                }
                catch(LogException e){
                    //do nothing
                }
                //The UNIX timestamp. Unit: seconds. Format the output value of the timestamp. 
                System.out.println("The time when the last data entry was received: " + endPrg);
            }
        }
    }
}

What to do next

  • Configure Log4j
    We recommend that you configure Log4j for the consumer program to return error messages in consumer groups. This helps troubleshoot the errors. The following code shows a log4j.properties configuration file that is commonly used:
    log4j.rootLogger = info,stdout
    log4j.appender.stdout = org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.Target = System.out
    log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
    After you configure Log4j, you can view information about exceptions that occur when you run the consumer program. The following sample exception information is for your reference:
    [WARN ] 2018-03-14 12:01:52,747 method:com.aliyun.openservices.loghub.client.LogHubConsumer.sampleLogError(LogHubConsumer.java:159)
    com.aliyun.openservices.log.exception.LogException: Invalid loggroup count, (0,1000]
  • Use a consumer group to consume data that is generated from a certain point in time.
    // consumerStartTimeInSeconds indicates that the data generated after the point in time is consumed. 
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          int consumerStartTimeInSeconds);
    
    // The value of the position parameter is an enumeration variable. LogHubConfig.ConsumePosition.BEGIN_CURSOR indicates that the consumption starts from the earliest data. LogHubConfig.ConsumePosition.END_CURSOR indicates that the consumption starts from the latest data. 
    public LogHubConfig(String consumerGroupName, 
                          String consumerName, 
                          String loghubEndPoint,
                          String project, String logStore,
                          String accessId, String accessKey,
                          ConsumePosition position);
    Note
    • You can use different constructors based on your business requirements.
    • If a checkpoint is stored on the server, data consumption starts from this checkpoint.
  • Reset a checkpoint
    public static void updateCheckpoint() throws Exception {
            Client client = new Client(host, accessId, accessKey);
            long timestamp = Timestamp.valueOf("2017-11-15 00:00:00").getTime() / 1000;
            ListShardResponse response = client.ListShard(new ListShardRequest(project, logStore));
            for (Shard shard : response.GetShards()) {
                int shardId = shard.GetShardId();
                String cursor = client.GetCursor(project, logStore, shardId, timestamp).GetCursor();
                client.UpdateCheckPoint(project, logStore, consumerGroup, shardId, cursor);
            }
        }

Authorize a RAM user to access consumer groups

Before you use a RAM user to access consumer groups, you must grant the required permissions to the RAM user. For more information, see Grant permissions to a RAM user.

The following table describes the actions that a RAM user can perform.
Action Description Resource
log:GetCursorOrData (GetCursor and PullLogs) Obtains cursors based on the time when logs are generated. acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}
log:CreateConsumerGroup Creates a consumer group for a specific Logstore. acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:ListConsumerGroup Queries all consumer groups of a specific Logstore. acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/*
log:UpdateCheckPoint Updates the consumption checkpoint in a shard of a specific consumer group. acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:ConsumerGroupHeartBeat Sends a heartbeat packet to Log Service for a consumer. acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:UpdateConsumerGroup Modifies the properties of a specific consumer group. acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
log:ConsumerGroupUpdateCheckPoint Retrieves the consumption checkpoints in one or all shards of a specific consumer group. acs:log:${regionName}:${projectOwnerAliUid}:project/${projectName}/logstore/${logstoreName}/consumergroup/${consumerGroupName}
For example, the project-test project resides in the China (Hangzhou) region. The ID of the Alibaba Cloud account to which the project belongs is 174649****602745. The name of the Logstore from which you want to consume log data is logstore-test, and the consumer group name is consumergroup-test. To allow a RAM user to access the consumer group, you must grant the following permissions to the RAM user:
{
  "Version": "1",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "log:GetCursorOrData"
      ],
      "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test"
    },
    {
      "Effect": "Allow",
      "Action": [
        "log:CreateConsumerGroup",
        "log:ListConsumerGroup"
      ],
      "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "log:ConsumerGroupUpdateCheckPoint",
        "log:ConsumerGroupHeartBeat",
        "log:UpdateConsumerGroup",
        "log:GetConsumerGroupCheckPoint"
      ],
      "Resource": "acs:log:cn-hangzhou:174649****602745:project/project-test/logstore/logstore-test/consumergroup/consumergroup-test"
    }
  ]
}