Коннектор Confluent HTTP sink
Коннектор HTTP Sink не зависит от типа данных, поэтому ему не требуется схема Kafka, при этом он также поддерживает специфические для ClickHouse типы данных, такие как Maps и Arrays. Эта дополнительная гибкость достигается ценой немного более сложной конфигурации.
Ниже мы описываем простую установку, при которой сообщения считываются из одного топика Kafka, а строки вставляются в таблицу ClickHouse.
Коннектор HTTP распространяется по лицензии Confluent Enterprise.
Быстрый старт
1. Соберите параметры подключения
Чтобы подключиться к ClickHouse по HTTP(S) вам потребуется следующая информация:
| Параметр(ы) | Описание |
|---|---|
HOST and PORT | Typically, the port is 8443 when using TLS or 8123 when not using TLS. |
DATABASE NAME | Out of the box, there is a database named default, use the name of the database that you want to connect to. |
USERNAME and PASSWORD | Out of the box, the username is default. Use the username appropriate for your use case. |
The details for your ClickHouse Cloud service are available in the ClickHouse Cloud console. Select a service and click Connect:

Choose HTTPS. Connection details are displayed in an example curl command.

If you're using self-managed ClickHouse, the connection details are set by your ClickHouse administrator.
2. Запустите Kafka Connect и коннектор HTTP Sink
У вас есть два варианта:
-
Самостоятельное развертывание: Загрузите пакет Confluent и установите его локально. Следуйте инструкциям по установке коннектора, приведённым здесь. Если вы используете метод установки через confluent-hub, ваши локальные файлы конфигурации будут обновлены.
-
Confluent Cloud: Полностью управляемая версия HTTP Sink доступна для тех, кто использует Confluent Cloud для хостинга Kafka. Для этого требуется, чтобы ваша среда ClickHouse была доступна из Confluent Cloud.
В следующих примерах используется Confluent Cloud.
3. Создайте целевую таблицу в ClickHouse
Перед проверкой соединения создадим тестовую таблицу в ClickHouse Cloud, которая будет получать данные из Kafka:
4. Настройте HTTP Sink
Создайте топик Kafka и экземпляр HTTP Sink Connector:

Настройте HTTP Sink Connector:
- Укажите имя созданного топика
- Authentication
HTTP Url— URL-адрес ClickHouse Cloud с указанным запросомINSERT<protocol>://<clickhouse_host>:<clickhouse_port>?query=INSERT%20INTO%20<database>.<table>%20FORMAT%20JSONEachRow. Примечание: запрос должен быть закодирован.Endpoint Authentication type— BASICAuth username— имя пользователя ClickHouseAuth password— пароль ClickHouse
С этим HTTP Url легко допустить ошибку. Убедитесь, что экранирование выполнено точно, чтобы избежать проблем.

- Configuration
Input Kafka record value formatзависит от ваших исходных данных, но в большинстве случаев это JSON или Avro. В дальнейших настройках мы предполагаем форматJSON.- В разделе
advanced configurations:HTTP Request Method— установите POSTRequest Body Format— jsonBatch batch size— согласно рекомендациям ClickHouse установите значение не менее 1000.Batch json as array— trueRetry on HTTP codes— 400-500, но адаптируйте при необходимости, например, это может измениться, если у вас есть HTTP-прокси перед ClickHouse.Maximum Reties— значение по умолчанию (10) подходит, но вы можете скорректировать его для более надежных повторных попыток.

5. Тестирование подключения
Создайте сообщение в топике, настроенном для вашего HTTP Sink

и убедитесь, что созданное сообщение было записано в ваш экземпляр ClickHouse.
Устранение неполадок
HTTP Sink не объединяет сообщения в батчи
Коннектор HTTP Sink не объединяет запросы для сообщений, содержащих значения заголовков Kafka, которые различаются.
- Убедитесь, что ваши записи Kafka имеют одинаковый ключ.
- Когда вы добавляете параметры к URL-адресу HTTP API, каждая запись может приводить к уникальному URL-адресу. По этой причине батчирование отключается при использовании дополнительных параметров URL.
400 bad request
CANNOT_PARSE_QUOTED_STRING
Если HTTP Sink завершает работу с ошибкой со следующим сообщением при вставке JSON-объекта в столбец типа String:
Установите настройку input_format_json_read_objects_as_strings=1 в URL, указав её как URL‑кодированную строку SETTINGS%20input_format_json_read_objects_as_strings%3D1
Загрузка набора данных GitHub (необязательно)
Обратите внимание, что в этом примере сохраняются поля типа Array из набора данных GitHub. Мы предполагаем, что в вашем примере есть пустой топик github и что вы используете kcat для отправки сообщений в Kafka.
1. Подготовьте конфигурацию
Следуйте этим инструкциям по настройке Connect в соответствии с типом вашей установки, обращая внимание на различия между автономным и распределённым кластерами. Если вы используете Confluent Cloud, вам подходит распределённая схема.
Наиболее важным параметром является http.api.url. HTTP‑интерфейс для ClickHouse требует, чтобы вы закодировали выражение INSERT как параметр в URL. Оно должно включать формат (в данном случае JSONEachRow) и целевую базу данных. Формат должен соответствовать формату данных в Kafka, которые будут преобразованы в строку в теле HTTP‑запроса. Эти параметры должны быть URL‑кодированы. Пример такого формата для набора данных Github (предполагая, что вы запускаете ClickHouse локально) показан ниже:
Следующие дополнительные параметры относятся к использованию HTTP Sink с ClickHouse. Полный список параметров можно найти здесь:
request.method- Установите в POST.retry.on.status.codes- Установите в 400–500 для повторных попыток при любых кодах ошибок. Уточните значение на основе ожидаемых ошибок в данных.request.body.format- В большинстве случаев это будет JSON.auth.type- Установите в BASIC, если вы используете базовую аутентификацию в ClickHouse. Другие механизмы аутентификации, совместимые с ClickHouse, в настоящее время не поддерживаются.ssl.enabled- установите в true при использовании SSL.connection.user- имя пользователя для ClickHouse.connection.password- пароль для ClickHouse.batch.max.size- Количество строк, отправляемых в одном пакете. Убедитесь, что это значение достаточно велико. Согласно рекомендациям ClickHouse, значение 1000 следует считать минимальным.tasks.max- Коннектор HTTP Sink поддерживает выполнение одной или нескольких задач. Это можно использовать для повышения производительности. Совместно с размером пакета это ваши основные средства улучшения производительности.key.converter- задайте в соответствии с типами ваших ключей.value.converter- задайте исходя из типа данных в вашем топике. Для этих данных не требуется схема. Формат здесь должен быть согласован с FORMAT, указанным в параметреhttp.api.url. Проще всего использовать JSON и конвертер org.apache.kafka.connect.json.JsonConverter. Также возможно рассматривать значение как строку с помощью конвертера org.apache.kafka.connect.storage.StringConverter, однако это потребует от пользователя извлечения значения в операторе INSERT с использованием функций. Формат Avro также поддерживается в ClickHouse при использовании конвертера io.confluent.connect.avro.AvroConverter.
Полный список настроек, включая конфигурацию прокси, повторные попытки и расширенный SSL, можно найти здесь.
Примеры файлов конфигурации для примера данных GitHub можно найти здесь, при условии, что Connect запущен в автономном режиме, а Kafka развернута в Confluent Cloud.
2. Создайте таблицу ClickHouse
Убедитесь, что таблица создана. Ниже приведён пример минимального набора данных GitHub, использующего стандартный движок MergeTree.
3. Добавьте данные в Kafka
Отправьте сообщения в Kafka. Ниже мы используем kcat для отправки 10 000 сообщений.
Простой запрос к целевой таблице «Github» должен подтвердить, что данные были вставлены.