目前需要对Kafka Topic中的数据进行分析,查询了一下KSQLDB挺不错,它是一个流处理引擎,主要用于处理实时数据流,并支持 SQL 查询和流处理操作。KSQLDB 可以运行在 Apache Kafka 平台之上,它不需要额外的基础设施,因此可以方便地与 Kafka 进行集成。KSQLDB 可以实现流数据的可视化、数据的清洗和去重、流式计算等。KSQLDB 的主要特点包括易于使用、接近实时的处理速度、强大的 SQL 查询功能以及灵活的流处理操作。
环境准备
- MacBook Pro (13-inch, 2020, Four Thunderbolt 3 ports)
- Docker & Docker compose
下面代码用于部署Standalone
1 | --- |
启动KsqlDB服务:
1 | docker-compose up |
使用Docker链接到Cli中
1 | docker exec -it ksqldb-cli |
链接到cli中后,在终端运行命令链接:
1 | [ksqldb-cli] docker exec -it ksqldb-cli ksql http://ksqldb-server:8088 |
可以运行命令查看topic
1 | show topics; |
如果你想链接远端Kafka集群,将
1 | --- |
后面的操作和上面都一样了,启动的时候注意修改名称
Example 1: 创建流表并查询
展示有多少Topic:
1 | show topics; |
在cli链接后,我们创建一个riderLocations流,如果locations这个topic在kafka中不存在,则会对应创建。
1 | CREATE STREAM riderLocations ( |
然后往topic中插入一批数据
1 | kafka-console-producer --broker-list localhost:9092 --topic locations |
就可以愉快的查询它了
1 | SET 'auto.offset.reset'='earliest'; |
更多的详细用法可以参考网站ksqldb