最近在弄rocketmq-connect, 顺便参考 kafka-connect 的原理和实践. 这里做一下记录.

前置工作

  • 打包file模块
    • gradle connect:file:build -x test
  • 指定 plugin 路径
    • 修改 config/connect-standalone.properties 中 plugin.path 为你的路径
    • 将 file 模块打包的 jar 移动到 plugin.path 中
  • 创建topic
    • ./kafka-topics.sh –create –topic connect-test –replication-factor 1 –partitions 1 –zookeeper localhost:2181
    • ./kafka-topics.sh –list –zookeeper localhost:2181

启动 file source

  • 启动kafka, 参考 kafka-start
  • 创建connect的source文件: filesource.txt, 文件内容如下: Hello World One Step of Kafka
  • 修改 config/connect-file-source.properties 文件, 替换file变量
    file=/Users/snow_young/deploy/data/filesource.txt
  • intellij 启动 ConnectStandalone (这次会失败, 需要配置)
  • ConnectStandalone 添加程序参数: config/connect-standalone.properties config/connect-file-source.properties
  • 指定log4j配置文件
    • 第一种: ConnectStandalone 添加VM Options: -Dlog4j.configuration=file:你的kafka路径/kafka/config/connect-log4j.properties
    • 第二种: 将 config/connect-log4j.properties 复制到 runtime项目的 resources 文件夹下面,重命名为 log4j.properties
  • 设置日志路径: 在 log4j.properties 中添加 kafka.logs.dir=你自己的路径. 我这里是 /Users/snow_young/deploy/log
  • 将 compile libs.slf4jlog4j 复制到 build.gradle文件的 connect-runtime 项目的 dependencies 中

启动 file sink

  • 修改 config/connect-file-source.properties 修改 sink 文件: file=/Users/snow_young/deploy/data/filesink.txt, 替换成自己的文件路径就可以了
  • 项目参数添加 config/connect-standalone.properties config/connect-file-source.properties
  • 如果是同时启动 file-source 和 file-sink 的话, 需要处理rest端口重复的问题. 在 config/connect-standalone.properties 添加配置 rest.port=8090, 端口替换成其他的不冲突的端口就可以
  • 查看 file-sink 文件, 可以发现内容已经输出了.
-> % cat filesink.txt
Hello
World
One
Step
of
Kafka

同时启动source 和 sink

  • 项目参数添加 config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties