$ curl 'http://127.0.0.1:8083/connectors'
[]
$ curl -X POST -H 'Content-Type: application/json' -i 'http://127.0.0.1:8083/connectors' \
--data \
'{
"name":"test-upload-source-mysql",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://127.0.0.1:3306/kafka_store?user=root&password=资料库密码",
"table.whitelist":"source_users",
"incrementing.column.name": "id",
"mode":"incrementing",
"topic.prefix": "test-mysql-"}
}'
这边说明 config 参数:
建立成功会出现以下讯息:
HTTP/1.1 201 Created
Date: Sun, 29 Aug 2021 10:11:24 GMT
Location: http://127.0.0.1:8083/connectors/test-upload-source-mysql
Content-Type: application/json
Content-Length: 377
Server: Jetty(9.4.39.v20210325)
{
"name":"test-upload-source-mysql",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://127.0.0.1:3306/kafka_store?user=root&password=资料库密码",
"table.whitelist":"source_users",
"incrementing.column.name":"id",
"mode":"incrementing",
"topic.prefix":"test-mysql-",
"name":"test-upload-source-mysql"
},
"tasks":[],
"type":"source"
}
$ curl http://127.0.0.1:8083/connectors
["test-upload-source-mysql"]
$ curl http://127.0.0.1:8083/connectors/test-upload-source-mysql/status
{
"name":"test-upload-source-mysql",
"connector":{
"state":"RUNNING",
"worker_id":"192.168.133.118:8083"
},
"tasks":[
{
"id":0,
"state":"RUNNING",
"worker_id":"192.168.133.118:8083"
}
],
"type":"sink"
}
这边整理 REST API
| Method | REST API | 说明 |
| —————— | ——————- | ———- |
| GET | /connectors | 取得所有正在运作中的 connector |
| POST | /connectors | 新增一个 connector |
| GET | /connectors/{name} | 取得指定 connector 的资讯 |
| GET | /connectors/{name}/config | 取得指定 connector 的设定资讯 |
| PUT | /connectors/{name}/config | 修改指定 connector 的设定资讯 |
| GET | /connectors/{name}/status | 取得指定 connector 的运行状态(运行中、停止、失败),如果有发生错误,也会显示具体的错误资讯 |
| GET | /connectors/{name}/tasks | 取得指定 connector 运行中的 task |
| GET | /connectors/{name}/tasks/{tasksId}/status | 取得指定 connector 指令的 task 状态 |
| PUT | /connectors/{name}/pause | 暂停指定的 connector 和它的 task |
| PUT | /connectors/{name}/resume | 恢复一个暂停中的 connector |
| POST | /connectors/{name}/restart | 重新启动一个 connector |
| POST | /connectors/{name}/tasks/{taskID}/restart | 重新启动一个 task |
| DELETE | /connectors/{name} | 删除一个 connector,停止它的所有 task 并且删除相关 config |
正常运作的 connect 会将 mysql 改变的资料送给 topic test-mysql-sink_users,可以用 consumer 去看资料内容
在资料库新增几笔资料
INSERT INTO source_users(`username`, `nickname`) VALUES('小熊维尼', 'polar bear');
INSERT INTO source_users(`username`, `nickname`) VALUES('大谷翔平', '笑死');
INSERT INTO source_users(`username`, `nickname`) VALUES('邓不利多', '校长');
kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test-mysql-source_users --from-beginning
{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"username"
},
{
"type":"string",
"optional":false,
"field":"nickname"
}
],
"optional":false,
"name":"source_users"
},
"payload":{
"id":1,
"username":"小熊维尼",
"nickname":"polar bear"
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"username"
},
{
"type":"string",
"optional":false,
"field":"nickname"
}
],
"optional":false,
"name":"source_users"
},
"payload":{
"id":2,
"username":"大谷翔平",
"nickname":"笑死"
}
}{
"schema":{
"type":"struct",
"fields":[
{
"type":"int32",
"optional":false,
"field":"id"
},
{
"type":"string",
"optional":false,
"field":"username"
},
{
"type":"string",
"optional":false,
"field":"nickname"
}
],
"optional":false,
"name":"source_users"
},
"payload":{
"id":3,
"username":"邓不利多",
"nickname":"校长"
}
}
到这边 source connector 已经新增、设定成功,资料也有同步到 Kafka topic了,接下来要新增 sink connector
curl -X POST -H 'Content-Type: application/json' -i 'http://127.0.0.1:8083/connectors' \
--data \
'{"name":"test-download-to-mysql","config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://127.0.0.1:3306/target_database",
"connection.user":"root",
"connection.password":"ifalo.net",
"topics":"test-mysql-source_users",
"auto.create":"false",
"insert.mode": "upsert",
"pk.mode":"record_value",
"pk.fields":"id",
"table.name.format": "target_users"}}'
HTTP/1.1 201 Created
Date: Mon, 30 Aug 2021 09:28:58 GMT
Location: http://127.0.0.1:8083/connectors/test-download-to-mysql
Content-Type: application/json
Content-Length: 444
Server: Jetty(9.4.39.v20210325)
{"name":"test-download-to-mysql","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","connection.url":"jdbc:mysql://127.0.0.1:3306","connection.user":"root","connection.password":"ifalo.net","topics":"test-mysql-source_users","auto.create":"false","insert.mode":"upsert","pk.mode":"record_value","pk.fields":"id","table.name.format":"target_database.target_users","name":"test-download-to-mysql"},"tasks":[],"type":"sink"}
参数说明:
name:指定新增 connector 的名称
config:新增 connector 的设定资讯
建立完成後,一样可以查看设定和状态
$ curl http://127.0.0.1:8083/connectors/test-download-to-mysql/config
{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"table.name.format":"target_database.target_users",
"connection.password":"资料库密码",
"connection.user":"root",
"topics":"test-mysql-source_users",
"name":"test-download-to-mysql",
"auto.create":"false",
"connection.url":"jdbc:mysql://127.0.0.1:3306",
"insert.mode":"upsert",
"pk.mode":"record_value",
"pk.fields":"id"
}
$ curl http://127.0.0.1:8083/connectors/test-download-to-mysql/status
{
"name":"test-download-to-mysql",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://127.0.0.1:3306",
"connection.user":"root",
"connection.password":"资料库密码",
"topics":"test-mysql-source_users",
"auto.create":"false",
"insert.mode":"upsert",
"pk.mode":"record_value",
"pk.fields":"id",
"table.name.format":"target_database.target_users",
"name":"test-download-to-mysql"
},
"tasks":[
],
"type":"sink"
}
mysql> use target_database;
Database changed
mysql> select * from target_users;
+----+--------------+------------+
| id | username | nickname |
+----+--------------+------------+
| 1 | 小熊维尼 | polar bear |
| 2 | 大谷翔平 | 笑死 |
| 3 | 邓不利多 | 校长 |
+----+--------------+------------+
3 rows in set (0.00 sec)
今天的练习到此结束,如果想要将刚刚建立的 connector 删掉,一样是呼叫 REST API即可
$ curl -X DELETE 'http://127.0.0.1:8083/connectors/test-upload-source-mysql'
$ curl -X DELETE 'http://127.0.0.1:8083/connectors/test-download-to-mysql'
<<: 25.移转 Aras PLM大小事-流程签核动态指派(4)
>>: #9-数字动态好棒棒!(Vanilla JS requestAnimationFrame)
成功渲染出 BootstrapVue Navbar 元件之後,接着来将预设样式改成自己喜欢的专案色调...
前言 今天就要正式进入 DevOps 的环节了,首先要来介绍的是 Docker,之所以要先介绍 D...
一、前言 所谓的团队合作,我想最重要的就是如何沟通与使用有效之协作工具!沟通属於较偏人性与软性之...
嗨大家!像昨天说的,今天会讲怎麽用 SWR 实作 Notion 部落格的 pagination (分...
在使用基本元件时,使用的观念与时下常见的IDE工具相类似。我们由布局开始构思,如果有需要多页面,则需...