Hadoop提供了集中式存储系统,方便集中数据分析和数据共享。 Hadoop对存储格式没有要求。可存储用户访问日志、产品信息、网页数据等数据。
两个常见数据源。一是数据源分散:机器生成的数据、用户访问日志、用户购买日志。另一种是传统系统中的数据:传统关系数据库(MySQL、Oracle)、磁盘阵列、磁带。
Flume 由三部分组成。 Master负责通信和配置管理,是集群的控制器。收集器用于聚合数据。通常会导致更大的数据流。然后加载到HDFS。 Agent负责收集数据。是 Flume 中数据流生成的地方,Agent 将生成的数据传输给 Collector。
Agent用于收集数据流产生的地方的数据。通常由两部分组成:Source 和Sink。 Source用于获取数据,可以从文本文件、syslog、HTTP等中获取。Sink进一步将Source获取到的数据传输给后续的Collector。 Flume 附带了许多源和接收器实现。系统日志 TCP(5140) | agentSink("localhost",35853)表示通过通信协议获取5140端口数据,然后发送到localhost主机的35853端口。尾部(“/etc/services”)| agentSink("localhost",35853) 的意思是读取/etc/services文件夹下的文件,然后发送到localhost主机的35853端口。
Collector用于汇总多个Agent结果,并将汇总结果导入后端存储系统,如HDFS、HBase等。 Flume 附带了许多收集器实现collectorSource(35853) | console 表示将采集结果写入控制台。收藏家来源(35853) | CollectorSink("file:///tmp/flume/collected", "syslog") 表示将收集结果写入本地目录。收集器来源(35853) | CollectorSink("hdfs://namenode/user/flume/","syslog");表示将采集结果写入hdfs目录。
一般情况下,为了防止一台收集器故障而导致所有代理失效,可以将不同的代理连接到不同的收集器。同时,这可以保证不同数据类型的代理将数据放入同一个收集器中。
Master负责管理和协调Agent和Collector的配置信息,是Flume集群的控制器。它可以跟踪数据流的最终确认信息并通知代理。通常需要配置多个master来防止单点故障。使用zookeeper管理多个Master。
构建基于Flume的数据采集系统
Agent 和 Collector 都可以动态配置。可通过命令行或 Web 界面进行配置。命令行配置:在启动的master节点上依次输入“flume shell”→“connect localhost”。例如,执行 exec config a1 'tailDir("/data/logfile")' 'agentSink'。 Web界面:选择节点并填写源、宿等信息。
常见架构示例—拓扑1
agentA:尾部(“/ngnix/logs”)| agentSink(“收集器”,35853); agentB:尾部(“/ngnix/logs”)| agentSink(“收集器”,35853); agentC:尾部(“/ngnix/logs”)| agentSink("收集器",35853); agentD : tail(“/ngnix/logs”) | agentSink("收集器",35853); agentE:尾部(“/ngnix/logs”)| agentSink("收集器",35853); agentF:尾部(“/ngnix/logs”)| agentSink("收集器",35853);收集器:收集器源(35853)| CollectorSink("hdfs://namenode/flume/","srcdata");
代理A:src | agentE2ESink("collectorA",35853); 代理B:src | agentE2ESink("collectorA",35853); agentC:src | agentE2ESink("collectorB",35853); 代理D:src | agentE2ESink("collectorB",35853); 代理E:src | agentE2ESink("collectorC",35853); agentF:src | agentE2ESink("collectorC",35853); collectorA:collectorSource(35853)| CollectorSink("hdfs://...","src"); collectorB:collectorSource(35853)| CollectorSink("hdfs://...","src"); collectorC:collectorSource(35853)| CollectorSink("hdfs://...","src");
代理B:src | agentE2EChain("collectorA:35853","collectorC:35853"); agentC:src | agentE2EChain("collectorB:35853","collectorA:35853"); 代理D:src | agentE2EChain("collectorB:35853","collectorC:35853"); 代理E:src | agentE2EChain("collectorC:35853","collectorA:35853"); agentF:src | agentE2EChain("collectorC:35853","collectorB:35853"); collectorA:collectorSource(35853)| CollectorSink("hdfs://...","src"); collectorB:collectorSource(35853)| CollectorSink("hdfs://...","src"); collectorC:collectorSource(35853)| CollectorSink("hdfs://...","src"); 传统数据库与Hadoop之间的数据同步 Sqoop:SQL-to-Hadoop,连接传统关系数据库和Hadoop的桥梁。关系数据库中的数据可以导入到Hadoop系统(例如HDFS HBase和Hive)中,也可以从Hadoop系统中提取数据并导出到关系数据库中。 MapReduce可以用来加速数据传输,实现数据传输的批处理。 Sqoop的优势在于资源的高效可控利用、任务并行、超时等。数据类型映射和转换可以自动化或由用户定制。同时支持MySQL、Oracle、PostgreSQL等多种数据库。
您可以使用sqoop命令生成map任务,将传统数据库中的数据传输到HDFS、Hbase或Hive。 Sqoop import 将数据从关系数据库导入到 Hadoop
第一步:Sqoop与数据库服务器通信,获取数据库表的元数据信息;步骤2:Sqoop启动MapOnly MR作业,并使用元数据信息并行将数据写入Hadoop。 Sqoop 导入用法: sqoop导入\ --连接jdbc:mysql://www.sychzs.cn/sqoop \ --用户名sqoop\ --密码sqoop\ --表城市 --connnect:指定 JDBC URL --用户名/密码:mysql数据库的用户名 --table:要读取的数据库表 1,美国,帕洛阿尔托 2,捷克共和国,布尔诺 3,美国,桑尼维尔 导入到指定目录: sqoop导入\ --连接jdbc:mysql://www.sychzs.cn/sqoop \ --用户名sqoop\ --密码sqoop\ --表城市\ --target-dir /etl/input/cities 导入字段国家值为USA: sqoop导入\ --连接jdbc:mysql://www.sychzs.cn/sqoop \ --用户名sqoop\ --密码sqoop\ --表城市\ --其中“国家 = '美国'” 生成序列文件: sqoop导入\ --连接jdbc:mysql://www.sychzs.cn/sqoop \ --用户名sqoop\ --密码sqoop\ --表城市\ --作为序列文件 指定地图数量: sqoop导入\ --连接jdbc:mysql://www.sychzs.cn/sqoop \ --用户名sqoop\ --密码sqoop\ --表城市\ --num-mappers 10 Sqoop 导入—导入多个表: sqoop导入\ --连接jdbc:mysql://www.sychzs.cn/sqoop \ --用户名sqoop\ --密码sqoop\ --查询'www.sychzs.cn,\ 国家.国家,\ www.sychzs.cn \ 来自普通城市\ 使用(country_id)加入国家\ $条件在哪里'\ --按id分割\ --目标目录城市 Sqoop导入增量导入(一): 适用于每次向数据库追加数据,但现有数据不变,仅导入id列值大于1的记录的情况。 sqoop导入\ --连接jdbc:mysql://www.sychzs.cn/sqoop \ --用户名sqoop\ --密码sqoop\ --表访问\ --增量追加\ --检查列id \ --最后值1 Sqoop导入增量导入(2) 每次成功运行后,sqoop都会将最后一条记录的id值保存到metastore中以供下次使用。 sqoop 工作\ --创建访问\ --导入\ --连接jdbc:mysql://www.sychzs.cn/sqoop \ --用户名sqoop\ --密码sqoop\ --表访问\ --增量追加\ --检查列id \ --最后值0 运行 sqoop 作业:sqoop job --exec 访问 Sqoop导入增量导入(3) 数据库中有一列last_update_date,记录了最后一次修改时间。 Sqoop 仅在一定时间后将数据导入 Hadoop。 --连接jdbc:mysql://www.sychzs.cn/sqoop \ --用户名sqoop\ --密码sqoop\ --表访问\ --增量lastmodified\ --检查列last_update_date \ --最后值“2013-05-22 01:01:01” Sqoop 导出 将数据从 Hadoop 导入关系数据库
第一步:Sqoop与数据库服务器通信,获取数据库表的元数据信息;步骤2:并行导入数据:将Hadoop上的文件分成若干个Split;每个分割均由映射任务导入。 如何使用Sqoop导出 --连接jdbc:mysql://www.sychzs.cn/sqoop \ --用户名sqoop\ --密码sqoop\ --表城市\ --export-dir 城市 --用户名/密码:mysql数据库的用户名 --table:要导入的数据库表 export-dir:HDFS上的数据存储目录 Sqoop 导出—保证原子性: --连接jdbc:mysql://www.sychzs.cn/sqoop \ --用户名sqoop\ --密码sqoop\ --表城市\ --staging-table staging_cities Sqoop 导出—更新数据: sqoop 导出\ --连接jdbc:mysql://www.sychzs.cn/sqoop \ --用户名sqoop\ --密码sqoop\ --表城市\ --update-key id sqoop 导出\ --连接jdbc:mysql://www.sychzs.cn/sqoop \ --用户名sqoop\ --密码sqoop\ --表城市\ --更新密钥id \ --更新模式允许插入 Sqoop 导出—选择性插入: --连接jdbc:mysql://www.sychzs.cn/sqoop \ --用户名sqoop\ --密码sqoop\ --表城市\ --列国家、城市 Sqoop与其他系统结合
bin/hadoop fs -cat cars/part-m-*
sqoop导入\
sqoop导出\
--connnect:指定 JDBC URL
sqoop 导出 \
sqoop 导出\
sqoop import \
Sqoop 与 HBase 结合:
sqoop import \
相关文章
Python3.7将普通图片(png)转换为SV
2023-10-11 23:56安卓root权限获取方法(安卓root权限获取)
2023-10-11 23:36手机卡取款有手续费吗?零月手机卡是如何扣费的-信
2023-10-11 23:17html日历作业表代码,自定义透明桌面课程表带日
2023-10-11 22:57ubuntu上安装pur-ftpd
2023-10-11 22:37