One minute
Kafka Connect Start
最近在弄rocketmq-connect, 顺便参考 kafka-connect 的原理和实践. 这里做一下记录.
前置工作
- 打包file模块
- gradle connect:file:build -x test
- 指定 plugin 路径
- 修改 config/connect-standalone.properties 中 plugin.path 为你的路径
- 将 file 模块打包的 jar 移动到 plugin.path 中
- 创建topic
- ./kafka-topics.sh –create –topic connect-test –replication-factor 1 –partitions 1 –zookeeper localhost:2181
- ./kafka-topics.sh –list –zookeeper localhost:2181
启动 file source
- 启动kafka, 参考 kafka-start
- 创建connect的source文件: filesource.txt, 文件内容如下:
Hello
World
One
Step
of
Kafka
- 修改 config/connect-file-source.properties 文件, 替换file变量
file=/Users/snow_young/deploy/data/filesource.txt
- intellij 启动 ConnectStandalone (这次会失败, 需要配置)
- ConnectStandalone 添加程序参数: config/connect-standalone.properties config/connect-file-source.properties
- 指定log4j配置文件
- 第一种: ConnectStandalone 添加VM Options: -Dlog4j.configuration=file:你的kafka路径/kafka/config/connect-log4j.properties
- 第二种: 将 config/connect-log4j.properties 复制到 runtime项目的 resources 文件夹下面,重命名为 log4j.properties
- 设置日志路径: 在 log4j.properties 中添加 kafka.logs.dir=你自己的路径. 我这里是 /Users/snow_young/deploy/log
- 将 compile libs.slf4jlog4j 复制到 build.gradle文件的 connect-runtime 项目的 dependencies 中
启动 file sink
- 修改 config/connect-file-source.properties 修改 sink 文件: file=/Users/snow_young/deploy/data/filesink.txt, 替换成自己的文件路径就可以了
- 项目参数添加 config/connect-standalone.properties config/connect-file-source.properties
- 如果是同时启动 file-source 和 file-sink 的话, 需要处理rest端口重复的问题. 在 config/connect-standalone.properties 添加配置 rest.port=8090, 端口替换成其他的不冲突的端口就可以
- 查看 file-sink 文件, 可以发现内容已经输出了.
-> % cat filesink.txt
Hello
World
One
Step
of
Kafka
同时启动source 和 sink
- 项目参数添加 config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
145 Words
2019-10-27 11:08 +0800