大数据Hadoop之——Kafka 图形化工具 EFAK(EFAK环境部署)
suiw9 2024-10-30 05:43 25 浏览 0 评论
一、概述
EFAK(Eagle For Apache Kafka,以前称为 Kafka Eagle)是一款由国内公司开源的Kafka集群监控系统,可以用来监视kafka集群的broker状态、Topic信息、IO、内存、consumer线程、偏移量等信息,并进行可视化图表展示。独特的KQL还可以通过SQL在线查询kafka中的数据。
源码: https://github.com/smartloli/kafka-eagle/
下载: http://download.kafka-eagle.org/
官方文档:https://www.kafka-eagle.org/articles/docs/documentation.html
二、EFAK架构
EFAK分布式模式部署,这里以5个节点为例子(1个Master和4个Slave),各个节点的角色如下如所示:
三、EFAK数据采集原理
对于 Kafka,我们可以收集以下数据
- Kafka broker常用机器加载信息:内存、cpu、IP、版本等。
- 服务监控数据:TPS、QPS、RT等
- 应用程序监控:组、消费者、生产者、主题等。
因为EFAK是kafka的监控系统,所以前提是需先安装Kafka和Zookeeper。
四、安装Kafka
kafka官网文档:https://kafka.apache.org/documentation/
1)Kafka下载
$ cd /opt/bigdata/hadoop/software
$ wget https://dlcdn.apache.org/kafka/3.1.1/kafka_2.13-3.1.1.tgz
$ tar -xf kafka_2.13-3.1.1.tgz -C /opt/bigdata/hadoop/server/
2)配置环境变量
$ vi /etc/profile
export KAFKA_HOME=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1
export PATH=$PATH:$KAFKA_HOME/bin
$ source /etc/profile
3)创建logs目录
$ mkdir $KAFKA_HOME/logs
4)修改kafka配置
$ cd $KAFKA_HOME
# 查看现有配置,去掉空行和注释
$ cat config/server.properties |grep -v '^$\|^#'
$ cat > $KAFKA_HOME/config/server.properties <<EOF
#broker的全局唯一编号,不能重复
broker.id=0
#删除topic功能使能
delete.topic.enable=true
#处理网络请求的线程数量
num.network.threads=3
#用来处理磁盘IO的现成数量
num.io.threads=8
#发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
#请求套接字的缓冲区大小
socket.request.max.bytes=104857600
#kafka数据的存储位置
log.dirs=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/logs
#topic在当前broker上的分区个数
num.partitions=1
#用来恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
#segment文件保留的最长时间,超时将被删除
log.retention.hours=168
#配置连接Zookeeper集群地址
zookeeper.connect=hadoop-node1:12181,hadoop-node2:12181,hadoop-node3:12181
#zookeeper连接超时时间
zookeeper.connection.timeout.ms=60000
EOF
5)修改zookeeper配置
# 创建zookeeper data和logs目录
$ mkdir $KAFKA_HOME/zookeeper_data $KAFKA_HOME/zookeeper_logs
$ vi $KAFKA_HOME/config/zookeeper.properties
# 配置主要修改如下:
#数据目录
dataDir=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/zookeeper_data
#日志目录
dataLogDir=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1/zookeeper_logs
#心跳间隔时间,zookeeper中使用的基本时间单位,毫秒值。每隔2秒发送一个心跳,session时间tickTime*2
tickTime=2000
#leader与客户端连接超时时间。表示5个心跳间隔
initLimit=5
#Leader与Follower之间的超时时间,表示2个心跳间隔
syncLimit=2
#客户端连接端口,默认端口2181
clientPort=12181
# zookeeper集群配置项,server.1,server.2,server.3是zk集群节点;hadoop-node1,hadoop-node2,hadoop-node3是主机名称;2888是主从通信端口;3888用来选举leader
server.1=hadoop-node1:2888:3888
server.2=hadoop-node2:2888:3888
server.3=hadoop-node3:2888:3888
6)配置Zookeeper myid
# 在hadoop-node1配置如下:
$ echo 1 > $KAFKA_HOME/zookeeper_data/myid
# 在hadoop-node2配置如下:
$ echo 2 > $KAFKA_HOME/zookeeper_data/myid
# 在hadoop-node3配置如下:
$ echo 3 > $KAFKA_HOME/zookeeper_data/myid
7)开启Kafka JMX监控
# 在kafka-server-start.sh文件中添加export JMX_PORT="9988",端口自定义就行
$ vi $KAFKA_HOME/bin/kafka-server-start.sh
重启kafka
$ $KAFKA_HOME/bin/kafka-server-stop.sh ; $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
【问题】如遇以下报错:
ERROR Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) kafka.common.InconsistentClusterIdException: The Cluster ID ELIH-KKbRP-NnPnHt4z-lA doesn't match stored clusterId Some(2HC_x7bTR_u2bCxrqw0Otw) in meta.properties. The broker is trying to join the wrong cluster. Configured zookeeper.connect may be wrong. at kafka.server.KafkaServer.startup(KafkaServer.scala:228) at kafka.Kafka$.main(Kafka.scala:109) at kafka.Kafka.main(Kafka.scala)
【解决】在server.properties找到log.dirs配置的路径。将该路径下的meta.properties文件删除,或者编辑meta.properties文件修改里面的cluster.id即可。
8)将kafka目录推送到其它节点
$ scp -r $KAFKA_HOME hadoop-node2:/opt/bigdata/hadoop/server/
$ scp -r $KAFKA_HOME hadoop-node3:/opt/bigdata/hadoop/server/
# 在hadoop-node2和hadoop-node3节点上设置环境变量
$ vi /etc/profile
export KAFKA_HOME=/opt/bigdata/hadoop/server/kafka_2.13-3.1.1
export PATH=$PATH:$KAFKA_HOME/bin
$ source /etc/profile
# 修改advertised.listeners,值改成对应的hostname或者ip
【温馨提示】修改hadoop-node2上server.properties文件的broker.id,设置为1和2,只要不重复就行,advertised.listeners地址改成对应机器IP。
9)启动服务
启动zookeeper集群之后再启动kafka集群
$ cd $KAFKA_HOME
# -daemon后台启动
$ ./bin/zookeeper-server-start.sh -daemon ./config/zookeeper.properties
# 默认端口2181,可以在配置自定义,这里修改为12181端口
$ netstat -tnlp|grep 12181
启动Kafka
$ cd $KAFKA_HOME
# 默认端口9092,这里修改成了19092,可以修改listeners和advertised.listeners
$ ./bin/kafka-server-start.sh -daemon ./config/server.properties
$ netstat -tnlp|grep 9092
$ jps
五、安装EFAK
1)下载EFAK
$ cd /opt/bigdata/hadoop/software
$ wget https://github.com/smartloli/kafka-eagle-bin/archive/v2.1.0.tar.gz
$ tar -xf kafka-eagle-bin-2.1.0.tar.gz -C /opt/bigdata/hadoop/server/
$ cd /opt/bigdata/hadoop/server/
2)创建数据库
$ mysql -uroot -p
123456
create database ke;
3)设置环境变量
$ vi /etc/profile
export KE_HOME=/opt/bigdata/hadoop/server/kafka-eagle-bin-2.1.0/efak-web-2.1.0
export PATH=$PATH:$KE_HOME/bin
$ source /etc/profile
4)配置
这里设置hadoop-node1为master节点,其它两个节点为slave节点,修改参数
$ vi $KE_HOME/conf/system-config.properties
# Multi zookeeper&kafka cluster list -- The client connection address of the Zookeeper cluster is set here
efak.zk.cluster.alias=cluster1
cluster1.zk.list=hadoop-node1:12181,hadoop-node2:12181,hadoop-node3:12181
######################################
# kafka jmx 地址,默认Apache发布的Kafka基本是这个默认值,
# 对于一些公有云Kafka厂商,它们会修改这个值,
# 比如会将jmxrmi修改为kafka或者是其它的值,
# 若是选择的公有云厂商的Kafka,可以根据实际的值来设置该属性
######################################
cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi
# Zkcli limit -- Zookeeper cluster allows the number of clients to connect to
# If you enable distributed mode, you can set value to 4 or 8
kafka.zk.limit.size=16
# EFAK webui port -- WebConsole port access address
efak.webui.port=8048
######################################
# EFAK enable distributed,启用分布式部署
######################################
efak.distributed.enable=true
# 设置节点类型slave or master
# master worknode set status to master, other node set status to slave
efak.cluster.mode.status=master
# deploy efak server address
efak.worknode.master.host=hadoop-node1
efak.worknode.port=8085
# Kafka offset storage -- Offset stored in a Kafka cluster, if stored in the zookeeper, you can not use this option
cluster1.efak.offset.storage=kafka
# Whether the Kafka performance monitoring diagram is enabled
efak.metrics.charts=true
# EFAK keeps data for 30 days by default
efak.metrics.retain=15
# If offset is out of range occurs, enable this property -- Only suitable for kafka sql
efak.sql.fix.error=false
efak.sql.topic.records.max=5000
# Delete kafka topic token -- Set to delete the topic token, so that administrators can have the right to delete
efak.topic.token=keadmin
# 关闭自带的sqlite数据库,使用外部的mysql数据库
# Default use sqlite to store data
# efak.driver=org.sqlite.JDBC
# It is important to note that the '/hadoop/kafka-eagle/db' path must be exist.
# efak.url=jdbc:sqlite:/hadoop/kafka-eagle/db/ke.db
# efak.username=root
# efak.password=smartloli
# 配置外部数据库
# (Optional) set mysql address
efak.driver=com.mysql.jdbc.Driver
efak.url=jdbc:mysql://hadoop-node1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull
efak.username=root
efak.password=123456
5)调整启动参数
EFAK默认启动内存大小为2G,考虑到服务器情况可以将其调小
## 在 efak 安装目录执行
$ vi $KE_HOME/bin/ke.sh
## 将 KE_JAVA_OPTS 最大最小容量调小,例如:
export KE_JAVA_OPTS="-server -Xmx512m -Xms512m -XX:MaxGCPauseMillis=20 -XX:+UseG1GC -XX:MetaspaceSize=128m -XX:InitiatingHeapOccupancyPercent=35 -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=50 -XX:MaxMetaspaceFreeRatio=80"
6)修改 works配置
默认是localhost
$ cat >$KE_HOME/conf/works<<EOF
hadoop-node2
hadoop-node3
EOF
7)将EFAK推送其它节点
# hadoop-node1推送
$ scp -r /opt/bigdata/hadoop/server/kafka-eagle-bin-2.1.0 hadoop-node2:/opt/bigdata/hadoop/server/
$ scp -r /opt/bigdata/hadoop/server/kafka-eagle-bin-2.1.0 hadoop-node3:/opt/bigdata/hadoop/server/
# 在hadoop-node2和hadoop-node3配置环境变量修改节点类型为slave
8)启动
上面配置文件配置的是分布式部署,当然也可以单机跑,但是不建议单机跑
# 在master节点上执行
$ cd $KE_HOME/bin
$ chmod +x ke.sh
# 单机版启动
$ ke.sh start
# 集群方式启动
$ ke.sh cluster start
$ ke.sh cluster restart
web UI:http://192.168.0.113:8048/
账号密码:admin/123456
六、简单使用
1)Kafka CLI简单使用
【增】添加topic
# 创建topic,1副本,1分区,设置数据过期时间72小时(-1表示不过期),单位ms,72*3600*1000=259200000
$ kafka-topics.sh --create --topic test002 --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --partitions 1 --replication-factor 1 --config retention.ms=259200000
【查】
# 查看topic列表
$ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --list
# 查看topic列表详情
$ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --describe
# 指定topic
$ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --describe --topic test002
# 查看消费者组
$ kafka-consumer-groups.sh --bootstrap-server hadoop-node1:9092 --list
$ kafka-consumer-groups.sh --bootstrap-server hadoop-node1:9092 --describe --group test002
【改】这里主要是修改最常用的三个参数:分区、副本,过期时间
# 修改分区,扩分区,不能减少分区
$ kafka-topics.sh --alter --bootstrap-server hadoop-node1:9092 --topic test002 --partitions 2
# 修改过期时间,下面两行都可以
$ kafka-configs.sh --bootstrap-server hadoop-node1:9092 --alter --topic test002 --add-config retention.ms=86400000
$ kafka-configs.sh --bootstrap-server hadoop-node1:9092 --alter --entity-name test002 --entity-type topics --add-config retention.ms=86400000
# 修改副本数,将副本数修改成3
$ cat >1.json<<EOF
{"version":1,
"partitions":[
{"topic":"test002","partition":0,"replicas":[0,1,2]},
{"topic":"test002","partition":1,"replicas":[1,2,0]},
{"topic":"test002","partition":2,"replicas":[2,0,1]}
]}
EOF
$ kafka-topics.sh --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092 --describe --topic test002
【删】
$ kafka-topics.sh --delete --topic test002 --bootstrap-server hadoop-node1:9092,hadoop-node2:9092,hadoop-node3:9092
【生成者】
$ kafka-console-producer.sh --broker-list hadoop-node1:9092 --topic test002
{"id":"1","name":"n1","age":"20"}
{"id":"2","name":"n2","age":"21"}
{"id":"3","name":"n3","age":"22"}
【消费者】
# 从头开始消费
$ kafka-console-consumer.sh --bootstrap-server hadoop-node1:9092 --topic test002 --from-beginning
# 指定从分区的某个位置开始消费,这里只指定了一个分区,可以多写几行或者遍历对应的所有分区
$ kafka-console-consumer.sh --bootstrap-server hadoop-node1:9092 --topic test002 --partition 0 --offset 100
【消费组】
$ kafka-console-consumer.sh --bootstrap-server hadoop-node1:9092 --topic test002 --group test002
【查看数据积压】
$ kafka-consumer-groups.sh --bootstrap-server hadoop-node1:9092 --describe --group test002
2)EFAK常用命令
$KE_HOME/bin/ke.sh启动脚本中包含以下命令:
命令 | 描述 |
ke.sh start | 启动 EFAK 服务器。 |
ke.sh status | 查看 EFAK 运行状态。 |
ke.sh stop | 停止 EFAK 服务器。 |
ke.sh restart | 重新启动 EFAK 服务器。 |
ke.sh stats | 查看 linux 操作系统中的 EFAK 句柄数。 |
ke.sh find [ClassName] | 在 jar 中找到类名的位置。 |
ke.sh gc | 查看 EFAK 进程 gc。 |
ke.sh version | 查看 EFAK 版本。 |
ke.sh jdk | 查看 EFAK 安装的 jdk 详细信息。 |
ke.sh sdate | 查看 EFAK 启动日期。 |
ke.sh cluster start | 查看 EFAK 集群分布式启动。 |
ke.sh cluster status | 查看 EFAK 集群分布式状态。 |
ke.sh cluster stop | 查看 EFAK 集群分布式停止。 |
ke.sh cluster restart | 查看 EFAK 集群分布式重启。 |
EFAK环境部署,和kafka的一些简单操作就到这里了,后续会分享KSQL和其它更详细的页面化操作,请小伙伴耐心等待~
相关推荐
- 看完这一篇数据仓库干货,终于搞懂什么是hive了
-
一、Hive定义Hive最早来源于FaceBook,因为FaceBook网站每天产生海量的结构化日志数据,为了对这些数据进行管理,并且因为机器学习的需求,产生了Hive这们技术,并继续发展成为一个成...
- 真正让你明白Hive参数调优系列1:控制map个数与性能调优参数
-
本系列几章系统地介绍了开发中Hive常见的用户配置属性(有时称为参数,变量或选项),并说明了哪些版本引入了哪些属性,常见有哪些属性的使用,哪些属性可以进行Hive调优,以及如何使用的问题。以及日常Hi...
- HIVE SQL基础语法(hive sql是什么)
-
引言与关系型数据库的SQL略有不同,但支持了绝大多数的语句如DDL、DML以及常见的聚合函数、连接查询、条件查询。HIVE不适合用于联机事务处理,也不提供实时查询功能。它最适合应用在基于大量不可变数据...
- [干货]Hive与Spark sql整合并测试效率
-
在目前的大数据架构中hive是用来做离线数据分析的,而在Spark1.4版本中spark加入了sparksql,我们知道spark的优势是速度快,那么到底sparksql会比hive...
- Hive 常用的函数(hive 数学函数)
-
一、Hive函数概述及分类标准概述Hive内建了不少函数,用于满足用户不同使用需求,提高SQL编写效率:...
- 数仓/数开面试题真题总结(二)(数仓面试时应该讲些什么)
-
二.Hive...
- Tomcat处理HTTP请求流程解析(tomcat 处理请求过程)
-
1、一个简单的HTTP服务器在Web应用中,浏览器请求一个URL,服务器就把生成的HTML网页发送给浏览器,而浏览器和服务器之间的传输协议是HTTP,那么接下来我们看下如何用Java来实现一个简单...
- Python 高级编程之网络编程 Socket(六)
-
一、概述Python网络编程是指使用Python语言编写的网络应用程序。这种编程涉及到网络通信、套接字编程、协议解析等多种方面的知识。...
- [904]ScalersTalk成长会Python小组第20周学习笔记
-
Scalers点评:在2015年,ScalersTalk成长会Python小组完成了《Python核心编程》第1轮的学习。到2016年,我们开始第二轮的学习,并且将重点放在章节的习题上。Python小...
- 「web开发」几款http请求测试工具
-
curl命令CURL(CommandLineUniformResourceLocator),是一个利用URL语法,在命令行终端下使用的网络请求工具,支持HTTP、HTTPS、FTP等协议...
- Mac 基于HTTP方式访问下载共享文件,配置共享服务器
-
方法一:使用Python的SimpleHTTPServer进行局域网文件共享Mac自带Python,所以不需要安装其他软件,一条命令即可...
- 使用curl进行http高并发访问(php curl 大量并发获得结果)
-
本文主要介绍curl异步接口的使用方式,以及获取高性能的一些思路和实践。同时假设读者已经熟悉并且使用过同步接口。1.curl接口基本介绍curl一共有三种接口:EasyInterface...
- Django 中的 HttpResponse理解和用法-基础篇1
-
思路是方向,代码是时间,知识需积累,经验需摸索。希望对大家有用,有错误还望指出。...
你 发表评论:
欢迎- 一周热门
-
-
Linux:Ubuntu22.04上安装python3.11,简单易上手
-
宝马阿布达比分公司推出独特M4升级套件,整套升级约在20万
-
MATLAB中图片保存的五种方法(一)(matlab中保存图片命令)
-
别再傻傻搞不清楚Workstation Player和Workstation Pro的区别了
-
如何提取、修改、强刷A卡bios a卡刷bios工具
-
Linux上使用tinyproxy快速搭建HTTP/HTTPS代理器
-
Element Plus 的 Dialog 组件实现点击遮罩层不关闭对话框
-
日本组合“岚”将于2020年12月31日停止团体活动
-
SpringCloud OpenFeign 使用 okhttp 发送 HTTP 请求与 HTTP/2 探索
-
MacOS + AList + 访达,让各种云盘挂载到本地(建议收藏)
-
- 最近发表
- 标签列表
-
- dialog.js (57)
- importnew (44)
- windows93网页版 (44)
- yii2框架的优缺点 (45)
- tinyeditor (45)
- qt5.5 (60)
- windowsserver2016镜像下载 (52)
- okhttputils (51)
- android-gif-drawable (53)
- 时间轴插件 (56)
- docker systemd (65)
- slider.js (47)
- android webview缓存 (46)
- pagination.js (59)
- loadjs (62)
- openssl1.0.2 (48)
- velocity模板引擎 (48)
- pcre library (47)
- zabbix微信报警脚本 (63)
- jnetpcap (49)
- pdfrenderer (43)
- fastutil (48)
- uinavigationcontroller (53)
- bitbucket.org (44)
- python websocket-client (47)