If you pull incremental data, Log Service pulls only newly added or updated data each time. This way, the data pull efficiency is improved. This topic describes how to use the res_rds_mysql function to obtain incremental data from an ApsaraDB RDS for MySQL instance.

Prerequisites

  • Log Service
    • Data is uploaded to the source Logstore. For more information, see Data collection.
    • The destination Logstore is created. For more information, see Create a Logstore.
    • If you use a Resource Access Management (RAM) user, you must grant the user the permissions to transform data. For more information, see Authorize a RAM user to manage a data transformation task.
    • Indexes are configured for the source and destination Logstores. For more information, see Configure indexes.

      Data transformation is not based on indexes. However, you cannot perform query or analysis operations if you do not configure indexes.

  • RDS
    • An ApsaraDB RDS database and an account that is used to connect to the database are created. For more information, see Create accounts and databases for an ApsaraDB RDS for MySQL instance.
    • Data is uploaded to the databases and tables of an ApsaraDB RDS for MySQL instance.
    • A whitelist is configured for the ApsaraDB RDS for MySQL instance. For more information, see Configure an IP address whitelist for an ApsaraDB RDS for MySQL instance.
      Notice If you use the res_rds_mysql function to pull data from an ApsaraDB RDS for MySQL database, you must create a whitelist and add 0.0.0.0 to the whitelist. This way, all IP addresses are allowed to access the database. However, this may also cause potential security risks for the database. If you want to add specific IP addresses to the whitelist, you can submit a ticket.

Background information

A technology enterprise stores customer information in an ApsaraDB RDS for MySQL instance and stores customer maintenance logs in a Logstore of Log Service. The two types of data are continuously updated. The enterprise wants to perform a JOIN operation on the two types of data and save the results to a new Logstore.

In this case, you can use the res_rds_mysql function that is provided by Log Service. The function allows you to pull data from an ApsaraDB RDS for MySQL instance and save the data to a destination Logstore. If you want to pull incremental data from a database when a data transformation task is running, Log Service pulls only newly added or updated data based on the timestamp field of the database. This improves the performance of data pulls. You can use the function in a scenario that has the following characteristics: 1. A large amount of data exists in databases. 2. Data is frequently updated. 3. Data is not frequently deleted. 4. Real-time data transformation is required.

For more information about the incremental pull method and other pull methods, see res_rds_mysql.

Resources and examples

  • Log Service resources
    • Project: client-project
    • Source Logstore: client-log

      Example:

      Logstore
    • Destination Logstore: client-information
  • ApsaraDB RDS resources
    • Database: client-db
    • Table: client

      Example:

      ApsaraDB RDS database
    • Username and password for the database: test and test1234@@
    • Public endpoint of the database: rm-bp1k****tp8o.mysql.rds.aliyuncs.com

Procedure

  1. Log on to the Log Service console.
  2. Go to the data transformation page.
    1. In the Projects section, click client-project.
    2. Choose Log Storage > Logstores. On the Logstores tab, click client-log.
    3. On the Search & Analysis page, click Data Transformation.
  3. In the upper-right corner of the page, select a time range in which you want to query log data.
    Make sure that log data exists on the Raw Logs tab.
  4. In the edit box, enter a data transformation statement.
    For more information, see res_rds_mysql.
    e_table_map(
        res_rds_mysql(
            "rm-bp1k****tp8o.mysql.rds.aliyuncs.com",     
            "test",
            "test1234@@",
            "client-db",
            table="client",
            fields=["c_id", "name", "telephone", "update_time"],
            refresh_interval=1,
            primary_keys="c_id",
            update_time_key="update_time",
            deleted_flag_key=None,
        ),
        "c_id",
        ["name", "telephone"],
    )
  5. View data in quick preview mode.
    Check whether the transformation statement that you enter is valid in quick preview mode. For more information, see Quick preview.
    1. Click Quick Rule Creation.
    2. Choose Data Testing > Data. On the Data tab, enter the following content:
      {
        "__source__": "192.0.2.0",
        "__time__": 1624956516,
        "__topic__": "log",
        "__tag__:__client_ip__":"192.0.5.0",
        "c_id": "1",
        "staff_id": "002",
        "status": "Follow up",
        "tag": "Revisit",
      }
    3. Choose Data Testing > Dimension Table. On the Dimension Table tab, enter the following content:
      c_id,name,telephone,update_time,delete_flag
      1,maki,010-123,1606358931,false
    4. Click Preview Data.
      View the transformation results. Quick preview results
  6. View data in advanced preview mode.
    Check whether Log Service is connected to the ApsaraDB RDS for MySQL instance in advanced preview mode. For more information, see Advanced preview.
    1. Click the Advanced tab.
    2. Click Preview Data.
    3. In the Add Preview Settings panel, set an authorization method. Then, click OK.
      Authorize
    4. View the transformation results.
      Advanced preview results

      If a runtime error occurs, fix the error. For more information, see How do I fix errors in the syntax used to load data from ApsaraDB RDS for MySQL?.

  7. Create a data transformation task.
    1. Click Save as Transformation Rule.
    2. In the Create Data Transformation Rule panel, configure the required parameters. Then, click OK.
      For more information about the parameters, see Create a data transformation task. Transformation rule
      After the data is transformed, you can view the data in the destination Logstore. Results

FAQ

How can I use the delete feature in the incremental pull method?

In the incremental pull method, Log Service pulls only newly added or updated data based on the primary key and time field of the destination database table. If you label an entry in a database table as deleted, for example, delete_flag=true, Log Service still pulls and transforms the entry. To resolve this issue, Log Service adds the deleted_flag_key parameter to the res_rds_mysql function. The deleted_flag_key parameter is set in a data transformation statement. In this case, after a data transformation task obtains the updated data of a database table, the task removes the data rows whose delete_flag parameter is set to true from the in-memory dimension table. The data in the database table remains unchanged. The data rows that are removed by the data transformation task from the in-memory dimension table are excluded from the subsequent JOIN operation that is performed on the log data.

Notice
  • If the delete_flag parameter is set to true or 1 for an entry, the entry is deleted. For more information, see res_rds_mysql.
  • If the deleted_flag_key parameter is set in a data transformation statement, you must set the update_time_key parameter.
For example, if you add the name=mia and name=tom data entries to an ApsaraDB RDS for MySQL database table, the delete_flag parameter for the name=mia data entry is set to true. This indicates that the data entry is labeled as deleted. In this case, when Log Service updates the in-memory dimension table, the name=mia data entry is removed from the in-memory dimension table. The data entry is retained. Delete

Example:

e_table_map(
    res_rds_mysql(
        "rm-bp1****l3tp.mysql.rds.aliyuncs.com",     
        "test",
        "test1234@@",
        "client-db",
        table="client",
        fields=["c_id", "name", "telephone", "update_time"],
        refresh_interval=1,
        primary_keys="c_id",
        update_time_key="update_time",
        deleted_flag_key="delete_flag",
    ),
    "c_id",
    ["name", "telephone"],
)