卡夫卡的藏书阁【Book11】- Kafka Connect 2

Step3. 新增 Source connector

  • 可以查看一下当前的 connector
$ curl 'http://127.0.0.1:8083/connectors'
[]
  • 接着要新增 source connector:
$ curl -X POST -H 'Content-Type: application/json' -i 'http://127.0.0.1:8083/connectors' \
--data \
'{
"name":"test-upload-source-mysql",
"config":{
    "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
    "connection.url":"jdbc:mysql://127.0.0.1:3306/kafka_store?user=root&password=资料库密码",
    "table.whitelist":"source_users",
    "incrementing.column.name": "id",
    "mode":"incrementing",
    "topic.prefix": "test-mysql-"}
}'

这边说明 config 参数:

  • name:指定新增 connector 的名称
  • config:指定 connector 的设定资讯
    • connector.class:使用哪个 connector 类别
    • connection.url:连结 Mysql 的 url
    • table.whitelist:下载哪些表格
    • incrementing.column.name:增长的栏位名称
    • mode:指定 connector 的模式
    • topic.prefix:Kafka会新增一个 Topic,这边是指令该 Topic 的前缀,最後产生的名称会是前缀加上表格名称,Ex. test-mysql-source_users

建立成功会出现以下讯息:

HTTP/1.1 201 Created
Date: Sun, 29 Aug 2021 10:11:24 GMT
Location: http://127.0.0.1:8083/connectors/test-upload-source-mysql
Content-Type: application/json
Content-Length: 377
Server: Jetty(9.4.39.v20210325)

{
   "name":"test-upload-source-mysql",
   "config":{
      "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
      "connection.url":"jdbc:mysql://127.0.0.1:3306/kafka_store?user=root&password=资料库密码",
      "table.whitelist":"source_users",
      "incrementing.column.name":"id",
      "mode":"incrementing",
      "topic.prefix":"test-mysql-",
      "name":"test-upload-source-mysql"
   },
   "tasks":[],
   "type":"source"
}
  • 查看当前建立的 connectors
$ curl http://127.0.0.1:8083/connectors
["test-upload-source-mysql"]
  • 查看 connector 运行状态
$ curl http://127.0.0.1:8083/connectors/test-upload-source-mysql/status
{
   "name":"test-upload-source-mysql",
   "connector":{
      "state":"RUNNING",
      "worker_id":"192.168.133.118:8083"
   },
   "tasks":[
      {
         "id":0,
         "state":"RUNNING",
         "worker_id":"192.168.133.118:8083"
      }
   ],
   "type":"sink"
}
  • 这边整理 REST API
    | Method | REST API | 说明 |
    | —————— | ——————- | ———- |
    | GET | /connectors | 取得所有正在运作中的 connector |
    | POST | /connectors | 新增一个 connector |
    | GET | /connectors/{name} | 取得指定 connector 的资讯 |
    | GET | /connectors/{name}/config | 取得指定 connector 的设定资讯 |
    | PUT | /connectors/{name}/config | 修改指定 connector 的设定资讯 |
    | GET | /connectors/{name}/status | 取得指定 connector 的运行状态(运行中、停止、失败),如果有发生错误,也会显示具体的错误资讯 |
    | GET | /connectors/{name}/tasks | 取得指定 connector 运行中的 task |
    | GET | /connectors/{name}/tasks/{tasksId}/status | 取得指定 connector 指令的 task 状态 |
    | PUT | /connectors/{name}/pause | 暂停指定的 connector 和它的 task |
    | PUT | /connectors/{name}/resume | 恢复一个暂停中的 connector |
    | POST | /connectors/{name}/restart | 重新启动一个 connector |
    | POST | /connectors/{name}/tasks/{taskID}/restart | 重新启动一个 task |
    | DELETE | /connectors/{name} | 删除一个 connector,停止它的所有 task 并且删除相关 config |

  • 正常运作的 connect 会将 mysql 改变的资料送给 topic test-mysql-sink_users,可以用 consumer 去看资料内容

  • 在资料库新增几笔资料

INSERT INTO source_users(`username`, `nickname`) VALUES('小熊维尼', 'polar bear');
INSERT INTO source_users(`username`, `nickname`) VALUES('大谷翔平', '笑死');
INSERT INTO source_users(`username`, `nickname`) VALUES('邓不利多', '校长');
  • 开启 consumer,可以看到刚刚 mysql 新增的资料已 JSON 格式存在 topic中。
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-mysql-source_users --from-beginning

{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"int32",
            "optional":false,
            "field":"id"
         },
         {
            "type":"string",
            "optional":false,
            "field":"username"
         },
         {
            "type":"string",
            "optional":false,
            "field":"nickname"
         }
      ],
      "optional":false,
      "name":"source_users"
   },
   "payload":{
      "id":1,
      "username":"小熊维尼",
      "nickname":"polar bear"
   }
}{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"int32",
            "optional":false,
            "field":"id"
         },
         {
            "type":"string",
            "optional":false,
            "field":"username"
         },
         {
            "type":"string",
            "optional":false,
            "field":"nickname"
         }
      ],
      "optional":false,
      "name":"source_users"
   },
   "payload":{
      "id":2,
      "username":"大谷翔平",
      "nickname":"笑死"
   }
}{
   "schema":{
      "type":"struct",
      "fields":[
         {
            "type":"int32",
            "optional":false,
            "field":"id"
         },
         {
            "type":"string",
            "optional":false,
            "field":"username"
         },
         {
            "type":"string",
            "optional":false,
            "field":"nickname"
         }
      ],
      "optional":false,
      "name":"source_users"
   },
   "payload":{
      "id":3,
      "username":"邓不利多",
      "nickname":"校长"
   }
}

到这边 source connector 已经新增、设定成功,资料也有同步到 Kafka topic了,接下来要新增 sink connector

Step 4: 新增 Sink Connector

curl -X POST -H 'Content-Type: application/json' -i 'http://127.0.0.1:8083/connectors' \
--data \
'{"name":"test-download-to-mysql","config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://127.0.0.1:3306/target_database",
"connection.user":"root",
"connection.password":"ifalo.net",
"topics":"test-mysql-source_users",
"auto.create":"false",
"insert.mode": "upsert",
"pk.mode":"record_value",
"pk.fields":"id",
"table.name.format": "target_users"}}'

HTTP/1.1 201 Created
Date: Mon, 30 Aug 2021 09:28:58 GMT
Location: http://127.0.0.1:8083/connectors/test-download-to-mysql
Content-Type: application/json
Content-Length: 444
Server: Jetty(9.4.39.v20210325)

{"name":"test-download-to-mysql","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","connection.url":"jdbc:mysql://127.0.0.1:3306","connection.user":"root","connection.password":"ifalo.net","topics":"test-mysql-source_users","auto.create":"false","insert.mode":"upsert","pk.mode":"record_value","pk.fields":"id","table.name.format":"target_database.target_users","name":"test-download-to-mysql"},"tasks":[],"type":"sink"}

参数说明:

  • name:指定新增 connector 的名称

  • config:新增 connector 的设定资讯

    • connector.class:使用哪个 connector 类别
    • connection.url:Mysql 连接的 url
    • topics:从哪个 topic 读取资料
    • auto.create:是否自动新建表格
    • insert.mode:写入的模式,这边选用 upsert
    • pk.mode:选择主键模式 record_value 从讯息的 value 中取得资料
    • pk.fields:pk 栏位名称
    • table.name.format:指定输出到资料库哪个表格
  • 建立完成後,一样可以查看设定和状态

$ curl http://127.0.0.1:8083/connectors/test-download-to-mysql/config 
{
   "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
   "table.name.format":"target_database.target_users",
   "connection.password":"资料库密码",
   "connection.user":"root",
   "topics":"test-mysql-source_users",
   "name":"test-download-to-mysql",
   "auto.create":"false",
   "connection.url":"jdbc:mysql://127.0.0.1:3306",
   "insert.mode":"upsert",
   "pk.mode":"record_value",
   "pk.fields":"id"
}

$ curl http://127.0.0.1:8083/connectors/test-download-to-mysql/status
{
   "name":"test-download-to-mysql",
   "config":{
      "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
      "connection.url":"jdbc:mysql://127.0.0.1:3306",
      "connection.user":"root",
      "connection.password":"资料库密码",
      "topics":"test-mysql-source_users",
      "auto.create":"false",
      "insert.mode":"upsert",
      "pk.mode":"record_value",
      "pk.fields":"id",
      "table.name.format":"target_database.target_users",
      "name":"test-download-to-mysql"
   },
   "tasks":[
      
   ],
   "type":"sink"
}
  • 确认状态是运行中好,查看是否成功同步 source_database 的资料
mysql> use target_database;
Database changed
mysql> select * from target_users;
+----+--------------+------------+
| id | username     | nickname   |
+----+--------------+------------+
|  1 | 小熊维尼     | polar bear |
|  2 | 大谷翔平     | 笑死       |
|  3 | 邓不利多     | 校长       |
+----+--------------+------------+
3 rows in set (0.00 sec)

今天的练习到此结束,如果想要将刚刚建立的 connector 删掉,一样是呼叫 REST API即可

$ curl -X DELETE 'http://127.0.0.1:8083/connectors/test-upload-source-mysql'
$ curl -X DELETE 'http://127.0.0.1:8083/connectors/test-download-to-mysql'

<<:  25.移转 Aras PLM大小事-流程签核动态指派(4)

>>:  #9-数字动态好棒棒!(Vanilla JS requestAnimationFrame)

Day 08:深仍可测的元件样式-Deep Selectors

成功渲染出 BootstrapVue Navbar 元件之後,接着来将预设样式改成自己喜欢的专案色调...

Day02-容器化管理工具(Docker)

前言 今天就要正式进入 DevOps 的环节了,首先要来介绍的是 Docker,之所以要先介绍 D...

Day07:部门与工程团队间协作的技巧(上)

一、前言   所谓的团队合作,我想最重要的就是如何沟通与使用有效之协作工具!沟通属於较偏人性与软性之...

#25 No-code 之旅 — 实作 Notion 部落格 Pagination (分页) 功能 ft. SWR

嗨大家!像昨天说的,今天会讲怎麽用 SWR 实作 Notion 部落格的 pagination (分...

基本元件

在使用基本元件时,使用的观念与时下常见的IDE工具相类似。我们由布局开始构思,如果有需要多页面,则需...