CDC из DynamoDB в ClickHouse
На этой странице описано, как настроить CDC из DynamoDB в ClickHouse с помощью ClickPipes. Существуют 2 компонента этой интеграции:
- Начальный снимок через S3 ClickPipes
- Обновления в реальном времени через Kinesis ClickPipes
Данные будут загружаться в ReplacingMergeTree
. Этот движок таблиц обычно используется для сценариев CDC, чтобы позволить применять операции обновления. Более подробно о этой модели можно найти в следующих статьях блога:
- Change Data Capture (CDC) с PostgreSQL и ClickHouse - Часть 1
- Change Data Capture (CDC) с PostgreSQL и ClickHouse - Часть 2
1. Настройка Kinesis Stream
Сначала вам нужно будет включить поток Kinesis на вашей таблице DynamoDB для захвата изменений в реальном времени. Мы хотим сделать это до создания снимка, чтобы избежать пропуска данных. Найдите руководство AWS, расположенное здесь.

2. Создание снимка
Далее мы создадим снимок таблицы DynamoDB. Это можно сделать с помощью экспорта AWS в S3. Найдите руководство AWS, расположенное здесь. Вы должны сделать "Полный экспорт" в формате JSON DynamoDB.

3. Загрузка снимка в ClickHouse
Создание необходимых таблиц
Данные снимка из DynamoDB будут выглядеть примерно так:
Обратите внимание, что данные находятся в вложенном формате. Нам нужно будет развернуть эти данные перед загрузкой в ClickHouse. Это можно сделать с помощью функции JSONExtract
в ClickHouse в материализованном представлении.
Мы хотим создать три таблицы:
- Таблица для хранения необработанных данных из DynamoDB
- Таблица для хранения окончательных развернутых данных (целевой таблицы)
- Материализованное представление для развертывания данных
Для примера данных DynamoDB, приведенного выше, таблицы ClickHouse будут выглядеть так:
Существуют некоторые требования к целевой таблице:
- Эта таблица должна быть таблицей
ReplacingMergeTree
- Таблица должна иметь колонку
version
- На следующих этапах мы будем сопоставлять поле
ApproximateCreationDateTime
из потока Kinesis с колонкойversion
.
- На следующих этапах мы будем сопоставлять поле
- Таблица должна использовать ключ партиционирования в качестве ключа сортировки (указано с помощью
ORDER BY
)- Строки с одинаковым ключом сортировки будут дедуплицированы на основе колонки
version
.
- Строки с одинаковым ключом сортировки будут дедуплицированы на основе колонки
Создание ClickPipe для снимка
Теперь вы можете создать ClickPipe для загрузки данных снимка из S3 в ClickHouse. Следуйте руководству по ClickPipe для S3 здесь, но используйте следующие настройки:
- Путь загрузки: Вам нужно будет определить путь экспортированных json файлов в S3. Путь будет выглядеть примерно так:
- Формат: JSONEachRow
- Таблица: Ваша таблица снимка (например,
default.snapshot
в приведенном выше примере)
После создания данные начнут заполняться в таблицы снимка и назначения. Вам не нужно ждать завершения загрузки снимка, прежде чем переходить к следующему шагу.
4. Создание Kinesis ClickPipe
Теперь мы можем настроить Kinesis ClickPipe для захвата изменений в реальном времени из потока Kinesis. Следуйте руководству по Kinesis ClickPipe здесь, но используйте следующие настройки:
- Поток: Поток Kinesis, использованный на шаге 1
- Таблица: Ваша таблица назначения (например,
default.destination
в приведенном выше примере) - Развернуть объект: true
- Сопоставление колонок:
ApproximateCreationDateTime
:version
- Сопоставьте другие поля с соответствующими колонками назначения, как показано ниже

5. Очистка (по желанию)
После завершения ClickPipe для снимка вы можете удалить таблицу снимка и материализованное представление.