Deduplicate Rows in a Table with Confluent Cloud for Apache Flink¶
Confluent Cloud for Apache Flink® enables generating a table that contains only unique records from an input table with only a few clicks.
In this guide, you create a Flink table and apply the Deduplicate Rows action to generate a topic that has only unique records, by using a deduplication statement. The Deduplicate Rows action creates a Flink SQL statement for you, but no knowledge of Flink SQL is required to use it.
This guide shows the following steps:
- Step 1: Create a users table
- Step 2: Apply the Deduplicate Topic action
- Step 3: Inspect the output table
Prerequisites¶
- Access to Confluent Cloud.
- The OrganizationAdmin, EnvironmentAdmin, or FlinkAdmin role for creating compute pools, or the FlinkDeveloper role if you already have a compute pool. If you don’t have the appropriate role, contact your OrganizationAdmin or EnvironmentAdmin. For more information, see Grant Role-Based Access in Confluent Cloud for Apache Flink.
- A provisioned Flink compute pool.
Step 1: Create a users table¶
Before you can deduplicate rows, you need a table with sample data that
contains duplicates. In this step, you create a simple users
table and
populate it with mock records, some of which are duplicated intentionally.
Log in to Confluent Cloud and navigate to your Flink workspace.
Run the following statement to create a
users
table.CREATE TABLE users ( user_id STRING NOT NULL, registertime BIGINT, gender STRING, regionid STRING );
Insert rows with mock data into the
users
table.INSERT INTO users VALUES ('Thomas A. Anderson', 1677260724, 'male', 'Region_4'), ('Thomas A. Anderson', 1677260724, 'male', 'Region_4'), ('Trinity', 1677260733, 'female', 'Region_4'), ('Trinity', 1677260733, 'female', 'Region_4'), ('Morpheus', 1677260742, 'male', 'Region_8'), ('Morpheus', 1677260742, 'male', 'Region_8'), ('Dozer', 1677260823, 'male', 'Region_1'), ('Agent Smith', 1677260955, 'male', 'Region_0'), ('Persephone', 1677260901, 'female', 'Region_2'), ('Niobe', 1677260921, 'female', 'Region_3'), ('Niobe', 1677260921, 'female', 'Region_3'), ('Niobe', 1677260921, 'female', 'Region_3'), ('Zee', 1677260922, 'female', 'Region_5');
Inspect the inserted rows.
SELECT * FROM users;
Your output should resemble:
user_id registertime gender regionid Thomas A. Anderson 1677260724 male Region_4 Thomas A. Anderson 1677260724 male Region_4 Trinity 1677260733 female Region_4 Trinity 1677260733 female Region_4 Morpheus 1677260742 male Region_8 Morpheus 1677260742 male Region_8 Dozer 1677260823 male Region_1 Agent Smith 1677260955 male Region_0 Persephone 1677260901 female Region_2 Niobe 1677260921 female Region_3 Niobe 1677260921 female Region_3 Niobe 1677260921 female Region_3 Zee 1677260922 female Region_5
Step 2: Apply the Deduplicate Topic action¶
In the previous step, you created a Flink table that had duplicate rows. In this step, you apply the Deduplicate Topic action to create an output table that has only unique rows.
In the navigation menu, click Data portal.
In the Data portal page, click the Environment dropdown menu and select the environment for your workspace.
In the Recently created section, find your users topic and click it to open the details pane.
Click Actions, and in the Actions list, click Deduplicate topic to open the Deduplicate topic dialog.
In the Fields to deduplicate dropdown, select user_id.
Flink uses the deduplication field as the output message key. This means that the output topic’s row key may be different from the input topic’s row key, because the deduplication statement’s DISTRIBUTED BY clause determines the output topic’s key.
For this example, the output message key is the
user_id
field.In the Compute pool dropdown, select the compute pool you want to use.
(Optional) In the Runtime configuration section, select Run with a service account to run the deduplicate query with a service account principal. Use this option for production queries.
Note
The service account you select must have the DeveloperManage and DeveloperWrite roles to create topics, schemas, and run Flink statements. For more information, see Grant Role-Based Access.
Click the Show SQL toggle to view the statement that the action will run.
For this example, the deduplication query depends on the
registertime
field, so you must modify the generated statement to use theregistertime
field as the field to sort on.Click Open SQL editor to modify the statement.
A Flink workspace opens with the generated statement in the cell.
In the cell, replace
$rowtime
withregistertime
in theORDER BY
clause.CREATE TABLE `<your-environment>`.`<your-kafka-cluster>`.`users_deduplicate` ( PRIMARY KEY (`user_id`) NOT ENFORCED ) DISTRIBUTED BY HASH( `user_id` ) WITH ( 'changelog.mode' = 'upsert', 'value.format'='avro-registry', 'key.format'='avro-registry' ) AS SELECT `user_id`, `registertime`, `gender`, `regionid` FROM ( SELECT *, ROW_NUMBER() OVER (PARTITION BY `user_id` ORDER BY registertime ASC) AS row_num FROM `<your-environment>`.`<your-kafka-cluster>`.`users`) WHERE row_num = 1;
Click Run to execute the deduplication query.
The CREATE TABLE AS SELECT statement creates the
users_deduplicate
table and populates it with rows from theusers
table using a deduplication query.When the Statement status changes to Running, you can query the
users_deduplicate
table.
Step 3: Inspect the output table¶
The statement generated by the Deduplicate Topic action created an
output table named users_deduplicate
. In this step, you query the output
table to see the deduplicated rows.
Run the following statement to inspect the
users_deduplicate
output table.SELECT * FROM users_deduplicate;
Your output should resemble:
user_id registertime gender regionid Thomas A. Anderson 1677260724 male Region_4 Trinity 1677260733 female Region_4 Morpheus 1677260742 male Region_8 Dozer 1677260823 male Region_1 Agent Smith 1677260955 male Region_0 Persephone 1677260901 female Region_2 Niobe 1677260921 female Region_3 Zee 1677260922 female Region_5