卡夫卡的藏书阁【Book10】- Kafka Connect 1

“Books are a narcotic.”
― Franz Kafka
突然想起了恐龙书


Kafka Connect 是一个可靠的、可随情境扩增或是缩减的资料传输工具,用来处理 Kafka 跟其他资料储存系统间的资料传输,透过定义 connector 可以轻易地从 Kafka 传入和传出大量资料。

Kafka Connect 可以消化掉整个资料库、搜集你app server的资料集,放入到 Kafka topics 中,确保资料在低延迟下可以被取用。汇出方面,Connector 可以将 Kafka topics 的资料传送给 Elasticsearch 或是离线分析用的Hadoop系统。

Kafka Connect 可以分为汇入的 source connector 跟汇出 sink connector,目前可以支援的范围很广,你可以将 Microsoft SQL Server、MQTT、Java JDBC、IMB MQ、salesforce、JSON档案、poster Sql、CSV档案、Mysql...等资料透过 source connector 汇入 Kafka topic,再透过 sink connector 将资料汇出到 Google BigQuery、hadoop、Amazon S3、elasticsearch、snowflake、ORACLE、各类DB...等。

坐而言不如起而行,今天会带大家简单实作一个小练习,我们将会透过 Kafka Connect 让两个资料库资料对接,做到类似ETL的功能,主要分为三个步骤:

  1. 设定来源资料库跟目标资料库
  2. 下载所需的 connect 和设定 Kafka connect-distributed
  3. 新增 Source connector
  4. 新增 Sink connector

Step 1: 设定来源资料库跟目标资料库

首先,在本地的 Mysql 创建一个来源资料库跟目标资料库

mysql> create database `source_database` default character set utf8mb4 collate utf8mb4_unicode_ci;
Query OK, 1 row affected (0.00 sec)

mysql> create database `target_database` default character set utf8mb4 collate utf8mb4_unicode_ci;
Query OK, 1 row affected (0.01 sec)

在来源资料库 source_database 和 target_database 各创建一张表当作资料来源

mysql> use source_database;
Database changed
mysql> CREATE TABLE `source_users` (
`id` INT(11) NOT NULL AUTO_INCREMENT,
`username` VARCHAR(20) NOT NULL,   
`nickname` VARCHAR(20) NOT NULL,   
PRIMARY KEY (`id`) 
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
Query OK, 0 rows affected (0.05 sec)

mysql> use target_database;
Database changed
mysql> CREATE TABLE `target_users` (
  `id` INT(11) NOT NULL AUTO_INCREMENT,
  `username` VARCHAR(20) NOT NULL,
  `nickname` VARCHAR(20) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
Query OK, 0 rows affected (0.03 sec)

Step 2: 下载所需的 connect 和设定 Kafka connect-distributed

  • 下载 Confluent 的 JDBC connect
    • 将档案下载到 Kafka Server 所在的机器
  • 因为今天练习是用 Mysql,所以还需要下载 maven mysql connect (Maven Repository: mysql » mysql-connector-java)
    • wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.20/mysql-connector-java-8.0.20.jar
  • 解压缩 unzip confluentinc-kafka-connect-jdbc-10.2.1.zip
  • 创建资料夹 mkdir -p kafkaConnect/lib
  • 将刚刚下载的 jdbc 档案都移到资料夹中,mysql 的 connector 移到 lib 资料夹中
mv confluentinc-kafka-connect-jdbc-10.2.1 kafkaConnect/
mv mysql-connector-java-8.0.20.jar kafkaConnect/lib/

我们今天是采用 connector 的 distributed 模式,另外还有 standalone 模式,官方建议线上采用 distributed 的模式,因为可扩增性、可用性和管理等各方面都更佳

  • 需要修改相对应的设定档 connect-distributed.properties
bootstrap.servers=127.0.0.1:9092 # 指到 Broker Server 的 IP 位子
group.id=connect-cluster
rest.port=8083 # REST 介面监听的 port,预设是8083,顺便一题如果你是用 connect 的 standalone 模式,预设 port 是 8084。
plugin.path=/usr/local/etc/kafkaConnect # 刚刚创建资料夹的绝对路径
  • 执行 kafka connect
    • connect-distributed /usr/local/etc/kafka/connect-distributed.properties
  • 启动完成後,可以查看目前的 connector 的 plugin
$ curl 'http://127.0.0.1:8083/connector-plugins'
[
   {
      "class":"io.confluent.connect.jdbc.JdbcSinkConnector",
      "type":"sink",
      "version":"10.2.1"
   },
   {
      "class":"io.confluent.connect.jdbc.JdbcSourceConnector",
      "type":"source",
      "version":"10.2.1"
   },
   {
      "class":"org.apache.kafka.connect.file.FileStreamSinkConnector",
      "type":"sink",
      "version":"2.8.0"
   },
   {
      "class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
      "type":"source",
      "version":"2.8.0"
   },
   {
      "class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
      "type":"source",
      "version":"1"
   },
   {
      "class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
      "type":"source",
      "version":"1"
   },
   {
      "class":"org.apache.kafka.connect.mirror.MirrorSourceConnector",
      "type":"source",
      "version":"1"
   }
]

<<:  Day9:终於要进去新手村了-Javascript-运算子

>>:  Day 10 - API 文件导览总结 - 重点整理

第 57 天 - 才知道 && 用法

今天才知道用 && 也可以串接,并且左边的命令失败,右边的命令将不会运行 test@...

EP 10: Passing Data for Navigation in TopStore App - I

Hello, 各位 iT邦帮忙 的粉丝们大家好~~~ 本篇是 Re: 从零开始用 Xamarin 技...

架站:Wordpress

为何选择Wordpress? 虽然内容管理系统(CMS)也有其他的选择(例如 Joomla!),但整...

JavaScript Day 29. 立即函式 IIFE

立即函式,也称 Immediately Invoked Function Expression,简称...

Material UI in React [ Day 25 ] Styles Advanced

今天要讲解的内容,在前面讲解theme的应用时,有稍微讲解了一些基本的应用,官方文件内前半部的内容我...