Перейти к основному содержимому
Перейти к основному содержимому

ClickHouse Kafka Connect Sink

примечание

Если вам нужна помощь, пожалуйста, создайте проблему в репозитории или задайте вопрос в публикации ClickHouse в Slack.

ClickHouse Kafka Connect Sink — это коннектор Kafka, который передает данные из темы Kafka в таблицу ClickHouse.

Лицензия

Коннектор Kafka Sink распространяется под лицензией Apache 2.0

Требования к окружению

В окружении должна быть установлена версия фреймворка Kafka Connect v2.7 или выше.

Матрица совместимости версий

Версия ClickHouse Kafka ConnectВерсия ClickHouseKafka ConnectПлатформа Confluent
1.0.0> 23.3> 2.7> 6.1

Основные функции

  • Поставляется с готовой к использованию семантикой exactly-once. Использует новую функцию ядра ClickHouse под названием KeeperMap (используется в качестве хранилища состояния для коннектора) и позволяет создать минималистичную архитектуру.
  • Поддержка сторонних хранилищ состояния: по умолчанию используется память, но также может использовать KeeperMap (вскоре будет добавлен Redis).
  • Интеграция с ядром: разработан, поддерживается и поддерживается ClickHouse.
  • Непрерывно тестируется с ClickHouse Cloud.
  • Вставка данных с объявленной схемой и без схемы.
  • Поддержка всех типов данных ClickHouse.

Инструкции по установке

Соберите ваши данные для подключения

To connect to ClickHouse with HTTP(S) you need this information:

  • The HOST and PORT: typically, the port is 8443 when using TLS or 8123 when not using TLS.

  • The DATABASE NAME: out of the box, there is a database named default, use the name of the database that you want to connect to.

  • The USERNAME and PASSWORD: out of the box, the username is default. Use the username appropriate for your use case.

The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select the service that you will connect to and click Connect:

Choose HTTPS, and the details are available in an example curl command.

If you are using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.

Общие инструкции по установке

Коннектор распространяется в виде единого JAR-файла, содержащего все классы, необходимые для работы плагина.

Чтобы установить плагин, выполните следующие шаги:

  • Скачайте zip-архив, содержащий JAR-файл коннектора, со страницы Релизы репозитория ClickHouse Kafka Connect Sink.
  • Извлеките содержимое ZIP-файла и скопируйте его в нужное место.
  • Добавьте путь с директорией плагина в конфигурацию plugin.path в вашем файле свойств Connect, чтобы позволить платформе Confluent найти плагин.
  • Укажите имя темы, имя хоста инстанса ClickHouse и пароль в конфигурации.
  • Перезапустите платформу Confluent.
  • Если вы используете платформу Confluent, войдите в интерфейс Confluent Control Center, чтобы убедиться, что ClickHouse Sink доступен в списке доступных коннекторов.

Опции конфигурации

Чтобы подключить ClickHouse Sink к серверу ClickHouse, вам необходимо указать:

  • данные подключения: hostname (обязательно) и port (необязательно)
  • учетные данные пользователя: password (обязательно) и username (необязательно)
  • класс коннектора: com.clickhouse.kafka.connect.ClickHouseSinkConnector (обязательно)
  • темы или topics.regex: темы Kafka для опроса — названия тем должны совпадать с названиями таблиц (обязательно)
  • конвертеры ключей и значений: задавайте в зависимости от типа данных в вашей теме. Обязательно, если не уже определены в конфигурации worker.

Полная таблица опций конфигурации:

Название свойстваОписаниеЗначение по умолчанию
hostname (Обязательно)Имя хоста или IP-адрес сервераN/A
portПорт ClickHouse — значение по умолчанию 8443 (для HTTPS в облаке), но для HTTP (значение по умолчанию для логирования на собственном хосте) это должно быть 81238443
sslВключить ssl-соединение с ClickHousetrue
jdbcConnectionPropertiesПараметры подключения при подключении к ClickHouse. Должны начинаться с ? и соединяться & между param=value""
usernameИмя пользователя базы данных ClickHousedefault
password (Обязательно)Пароль базы данных ClickHouseN/A
databaseНазвание базы данных ClickHousedefault
connector.class (Обязательно)Класс коннектора (явно установлен и оставлен как значение по умолчанию)"com.clickhouse.kafka.connect.ClickHouseSinkConnector"
tasks.maxКоличество задач коннектора"1"
errors.retry.timeoutТаймаут повторного запроса JDBC ClickHouse"60"
exactlyOnceВключен exactly once"false"
topics (Обязательно)Темы Kafka для опроса — названия тем должны совпадать с названиями таблиц""
key.converter (Обязательно* - см. описание)Укажите в зависимости от типов ваших ключей. Обязательно, если вы передаете ключи (и не указаны в конфигурации Worker)."org.apache.kafka.connect.storage.StringConverter"
value.converter (Обязательно* - см. описание)Установите в зависимости от типа данных в вашей теме. Поддерживаются: - форматы JSON, String, Avro или Protobuf. Обязательно, если не указаны в конфигурации Worker."org.apache.kafka.connect.json.JsonConverter"
value.converter.schemas.enableПоддержка схемы конвертера значений"false"
errors.toleranceОшибка толерантности коннектора. Поддерживается: none, all"none"
errors.deadletterqueue.topic.nameЕсли задано (с errors.tolerance=all), будет использоваться DLQ для неудачных партий (см. Устранение неполадок)""
errors.deadletterqueue.context.headers.enableДобавляет дополнительные заголовки для DLQ""
clickhouseSettingsСписок настроек ClickHouse, разделенный запятыми (например, "insert_quorum=2, etc...")""
topic2TableMapСписок, разделенный запятыми, который сопоставляет названия тем с названиями таблиц (например, "topic1=table1, topic2=table2, etc...")""
tableRefreshIntervalВремя (в секундах) для обновления кеша определения таблицы0
keeperOnClusterПозволяет конфигурировать параметр ON CLUSTER для собственных инстансов (например, ON CLUSTER clusterNameInConfigFileDefinition) для таблицы connect_state с exactly-once (см. Распределенные DDL Запросы)""
bypassRowBinaryПозволяет отключить использование RowBinary и RowBinaryWithDefaults для данных на основе схемы (Avro, Protobuf и т.д.) - должен использоваться только когда данные имеют отсутствующие столбцы, и Nullable/Default недопустимы"false"
dateTimeFormatsФорматы даты и времени для парсинга полей схемы DateTime64, разделенные ; (например, someDateField=yyyy-MM-dd HH:mm:ss.SSSSSSSSS;someOtherDateField=yyyy-MM-dd HH:mm:ss).""
tolerateStateMismatchПозволяет коннектору сбрасывать записи "раньше" текущего сохраненного смещения AFTER_PROCESSING (например, если смещение 5 отправлено, а смещение 250 было последним записанным смещением)"false"
ignorePartitionsWhenBatchingИгнорирует партиции при сборе сообщений для вставки (хотя только если exactlyOnce равно false). Примечание по производительности: чем больше задач коннектора, тем меньше партиций Kafka будет назначено на задачу - это может означать уменьшение отдачи."false"

Целевые таблицы

ClickHouse Connect Sink читает сообщения из тем Kafka и записывает их в соответствующие таблицы. ClickHouse Connect Sink записывает данные в существующие таблицы. Пожалуйста, убедитесь, что целевая таблица с подходящей схемой была создана в ClickHouse перед началом вставки данных в неё.

Каждой теме требуется выделенная целевая таблица в ClickHouse. Название целевой таблицы должно совпадать с названием исходной темы.

Предварительная обработка

Если вам нужно преобразовать исходящие сообщения перед отправкой их в ClickHouse Kafka Connect Sink, используйте Преобразования Kafka Connect.

Поддерживаемые типы данных

С объявленной схемой:

Тип Kafka ConnectТип ClickHouseПоддерживаетсяПримитив
STRINGStringДа
INT8Int8Да
INT16Int16Да
INT32Int32Да
INT64Int64Да
FLOAT32Float32Да
FLOAT64Float64Да
BOOLEANBooleanДа
ARRAYArray(T)Нет
MAPMap(Primitive, T)Нет
STRUCTVariant(T1, T2, ...)Нет
STRUCTTuple(a T1, b T2, ...)Нет
STRUCTNested(a T1, b T2, ...)Нет
BYTESStringНет
org.apache.kafka.connect.data.TimeInt64 / DateTime64Нет
org.apache.kafka.connect.data.TimestampInt32 / Date32Нет
org.apache.kafka.connect.data.DecimalDecimalНет

Без объявленной схемы:

Запись преобразуется в JSON и отправляется в ClickHouse как значение в формате JSONEachRow.

Рецепты конфигурации

Это некоторые общие рецепты конфигурации, чтобы быстро начать работу.

Основная конфигурация

Самая простая конфигурация, чтобы начать — предполагает, что вы запускаете Kafka Connect в распределенном режиме и у вас есть сервер ClickHouse, работающий на localhost:8443 с включенным SSL, данные находятся в формате JSON без схемы.

Основная конфигурация с несколькими темами

Коннектор может потреблять данные из нескольких тем.

Основная конфигурация с DLQ

Использование с различными форматами данных

Поддержка схемы Avro
Поддержка схемы Protobuf

Пожалуйста, обратите внимание: если вы столкнетесь с проблемами с отсутствующими классами, не каждая среда поставляется с конвертером protobuf, и вам может понадобиться альтернативная версия JAR с зависимостями.

Поддержка схемы JSON
Поддержка строк

Коннектор поддерживает конвертер String в различных форматах ClickHouse: JSON, CSV и TSV.

Логирование

Логирование предоставляется автоматически платформой Kafka Connect. Место назначения и формат логирования могут быть настроены через файл конфигурации Kafka Connect.

При использовании платформы Confluent логи можно увидеть, выполнив команду CLI:

Для получения дополнительной информации ознакомьтесь с официальным учебником.

Мониторинг

ClickHouse Kafka Connect сообщает о показателях времени через Java Management Extensions (JMX). JMX включен в коннекторе Kafka по умолчанию.

MBeanName ClickHouse Connect:

ClickHouse Kafka Connect сообщает следующие метрики:

НазваниеТипОписание
receivedRecordslongОбщее количество полученных записей.
recordProcessingTimelongОбщее время в наносекундах, затраченное на группировку и преобразование записей в унифицированную структуру.
taskProcessingTimelongОбщее время в наносекундах, затраченное на обработку и вставку данных в ClickHouse.

Ограничения

  • Удаления не поддерживаются.
  • Размер партии унаследован от свойств потребителя Kafka.
  • При использовании KeeperMap для exactly-once и изменении или перематывании смещения необходимо удалить содержимое из KeeperMap для конкретной темы. (Пожалуйста, смотрите руководство по устранению неполадок ниже для получения дополнительных деталей)

Настройка производительности

Если вы когда-либо думали: "Я хотел бы настроить размер партии для коннектора sink", тогда этот раздел для вас.

Fetch Connect против Подсчета коннектора

Kafka Connect (фреймворк, на котором построен наш коннектор sink) будет в фоновом режиме получать сообщения из тем Kafka (независимо от коннектора).

Вы можете контролировать этот процесс с помощью fetch.min.bytes и fetch.max.bytes — в то время как fetch.min.bytes устанавливает минимальное количество, необходимое, прежде чем фреймворк передаст значения коннектору (до предельного времени, установленного fetch.max.wait.ms), fetch.max.bytes устанавливает верхний предельный размер. Если вы хотите передать более крупные партии коннектору, одним из вариантов может быть увеличение минимального извлечения или максимального ожидания, чтобы сформировать более крупные пакеты данных.

Эти извлеченные данные затем потребляются клиентом коннектора, который опрашивает сообщения, где количество для каждого опроса контролируется параметром max.poll.records — обратите внимание, что извлечение независимо от опроса!

При настройке этих параметров пользователи должны стремиться к тому, чтобы их размер извлечения создавал несколько партий max.poll.records (и учитывать, что параметры fetch.min.bytes и fetch.max.bytes представляют сжатые данные) — так, чтобы каждая задача коннектора вставляла как можно большую партию.

ClickHouse оптимизирован для больших партий, даже с незначительной задержкой, а не для частых, но меньших партий — чем больше партия, тем лучше.

Более подробную информацию можно найти в документации Confluent или в документации Kafka.

Несколько тем с высоким объемом трафика

Если ваш коннектор настроен на подписку на несколько тем, вы используете topic2TableMap, чтобы сопоставить темы с таблицами, и вы испытываете узкое место при вставке, приводящее к задержке потребителя, подумайте о создании одного коннектора на тему. Главная причина, по которой это происходит, заключается в том, что в настоящее время партии вставляются в каждую таблицу по очереди.

Создание одного коннектора на тему является временным решением, которое обеспечивает максимально возможную скорость вставки.

Устранение неполадок

"Несоответствие состояния для темы [someTopic] партиции [0]"

Это происходит, когда смещение, сохраненное в KeeperMap, отличается от смещения, сохраненного в Kafka, обычно, когда тема была удалена или смещение было вручную отрегулировано. Чтобы исправить это, вам нужно удалить старые значения для данной темы + партиции.

ПРИМЕЧАНИЕ: Это регулирование может иметь последствия для exactly-once.

"Какие ошибки будет повторять коннектор?"

В настоящее время акцент сделан на выявлении ошибок, которые являются временными и могут быть повторными, включая:

  • ClickHouseException — это общее исключение, которое может быть вызвано ClickHouse. Обычно возникает, когда сервер перегружен, и следующие коды ошибок считаются особенно временными:
    • 3 - UNEXPECTED_END_OF_FILE
    • 159 - TIMEOUT_EXCEEDED
    • 164 - READONLY
    • 202 - TOO_MANY_SIMULTANEOUS_QUERIES
    • 203 - NO_FREE_CONNECTION
    • 209 - SOCKET_TIMEOUT
    • 210 - NETWORK_ERROR
    • 242 - TABLE_IS_READ_ONLY
    • 252 - TOO_MANY_PARTS
    • 285 - TOO_FEW_LIVE_REPLICAS
    • 319 - UNKNOWN_STATUS_OF_INSERT
    • 425 - SYSTEM_ERROR
    • 999 - KEEPER_EXCEPTION
    • 1002 - UNKNOWN_EXCEPTION
  • SocketTimeoutException — выбрасывается, когда время ожидания сокета истекает.
  • UnknownHostException — возникает, когда хост не может быть разрешен.
  • IOException — выбрасывается, когда возникают проблемы с сетью.

"Все мои данные пустые/нулевые"

Скорее всего, поля в ваших данных не совпадают с полями в таблице — это особенно часто бывает с CDC (и форматом Debezium). Одно из распространенных решений — добавить преобразование flatten в конфигурацию вашего коннектора:

Это преобразует ваши данные из вложенного JSON в плоский JSON (используя _ в качестве разделителя). Затем поля в таблице будут следовать формату "field1_field2_field3" (например, "before_id", "after_id" и т.д.).

"Я хочу использовать свои ключи Kafka в ClickHouse"

Ключи Kafka по умолчанию не хранятся в поле значений, но вы можете использовать преобразование KeyToValue, чтобы переместить ключ в поле значений (под новым названием поля _key):