“Books are a narcotic.”
― Franz Kafka
突然想起了恐龙书
Kafka Connect 是一个可靠的、可随情境扩增或是缩减的资料传输工具,用来处理 Kafka 跟其他资料储存系统间的资料传输,透过定义 connector 可以轻易地从 Kafka 传入和传出大量资料。
Kafka Connect 可以消化掉整个资料库、搜集你app server的资料集,放入到 Kafka topics 中,确保资料在低延迟下可以被取用。汇出方面,Connector 可以将 Kafka topics 的资料传送给 Elasticsearch 或是离线分析用的Hadoop系统。
Kafka Connect 可以分为汇入的 source connector 跟汇出 sink connector,目前可以支援的范围很广,你可以将 Microsoft SQL Server、MQTT、Java JDBC、IMB MQ、salesforce、JSON档案、poster Sql、CSV档案、Mysql...等资料透过 source connector 汇入 Kafka topic,再透过 sink connector 将资料汇出到 Google BigQuery、hadoop、Amazon S3、elasticsearch、snowflake、ORACLE、各类DB...等。
坐而言不如起而行,今天会带大家简单实作一个小练习,我们将会透过 Kafka Connect 让两个资料库资料对接,做到类似ETL的功能,主要分为三个步骤:
首先,在本地的 Mysql 创建一个来源资料库跟目标资料库
mysql> create database `source_database` default character set utf8mb4 collate utf8mb4_unicode_ci;
Query OK, 1 row affected (0.00 sec)
mysql> create database `target_database` default character set utf8mb4 collate utf8mb4_unicode_ci;
Query OK, 1 row affected (0.01 sec)
在来源资料库 source_database 和 target_database 各创建一张表当作资料来源
mysql> use source_database;
Database changed
mysql> CREATE TABLE `source_users` (
`id` INT(11) NOT NULL AUTO_INCREMENT,
`username` VARCHAR(20) NOT NULL,
`nickname` VARCHAR(20) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
Query OK, 0 rows affected (0.05 sec)
mysql> use target_database;
Database changed
mysql> CREATE TABLE `target_users` (
`id` INT(11) NOT NULL AUTO_INCREMENT,
`username` VARCHAR(20) NOT NULL,
`nickname` VARCHAR(20) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
Query OK, 0 rows affected (0.03 sec)
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.2.1/confluentinc-kafka-connect-jdbc-10.2.1.zip
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.20/mysql-connector-java-8.0.20.jar
unzip confluentinc-kafka-connect-jdbc-10.2.1.zip
mkdir -p kafkaConnect/lib
mv confluentinc-kafka-connect-jdbc-10.2.1 kafkaConnect/
mv mysql-connector-java-8.0.20.jar kafkaConnect/lib/
我们今天是采用 connector 的 distributed 模式,另外还有 standalone 模式,官方建议线上采用 distributed 的模式,因为可扩增性、可用性和管理等各方面都更佳
connect-distributed.properties
bootstrap.servers=127.0.0.1:9092 # 指到 Broker Server 的 IP 位子
group.id=connect-cluster
rest.port=8083 # REST 介面监听的 port,预设是8083,顺便一题如果你是用 connect 的 standalone 模式,预设 port 是 8084。
plugin.path=/usr/local/etc/kafkaConnect # 刚刚创建资料夹的绝对路径
connect-distributed /usr/local/etc/kafka/connect-distributed.properties
$ curl 'http://127.0.0.1:8083/connector-plugins'
[
{
"class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"type":"sink",
"version":"10.2.1"
},
{
"class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"type":"source",
"version":"10.2.1"
},
{
"class":"org.apache.kafka.connect.file.FileStreamSinkConnector",
"type":"sink",
"version":"2.8.0"
},
{
"class":"org.apache.kafka.connect.file.FileStreamSourceConnector",
"type":"source",
"version":"2.8.0"
},
{
"class":"org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
"type":"source",
"version":"1"
},
{
"class":"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
"type":"source",
"version":"1"
},
{
"class":"org.apache.kafka.connect.mirror.MirrorSourceConnector",
"type":"source",
"version":"1"
}
]
<<: Day9:终於要进去新手村了-Javascript-运算子
>>: Day 10 - API 文件导览总结 - 重点整理
今天才知道用 && 也可以串接,并且左边的命令失败,右边的命令将不会运行 test@...
Hello, 各位 iT邦帮忙 的粉丝们大家好~~~ 本篇是 Re: 从零开始用 Xamarin 技...
为何选择Wordpress? 虽然内容管理系统(CMS)也有其他的选择(例如 Joomla!),但整...
立即函式,也称 Immediately Invoked Function Expression,简称...
今天要讲解的内容,在前面讲解theme的应用时,有稍微讲解了一些基本的应用,官方文件内前半部的内容我...