当前位置:人工智能 > Hadoop数据采集及仓储系统Flume和Sqoop

Hadoop数据采集及仓储系统Flume和Sqoop

  • 发布:2023-10-10 20:28

-->

Hadoop提供了集中式存储系统,方便集中数据分析和数据共享。 Hadoop对存储格式没有要求。可存储用户访问日志、产品信息、网页数据等数据。

两个常见数据源。一是数据源分散:机器生成的数据、用户访问日志、用户购买日志。另一种是传统系统中的数据:传统关系数据库(MySQL、Oracle)、磁盘阵列、磁带。

Flume 由三​​部分组成。 Master负责通信和配置管理,是集群的控制器。收集器用于聚合数据。通常会导致更大的数据流。然后加载到HDFS。 Agent负责收集数据。是 Flume 中数据流生成的地方,Agent 将生成的数据传输给 Collector。

Agent用于收集数据流产生的地方的数据。通常由两部分组成:Source 和Sink。 Source用于获取数据,可以从文本文件、syslog、HTTP等中获取。Sink进一步将Source获取到的数据传输给后续的Collector。 Flume 附带了许多源和接收器实现。系统日志 TCP(5140) | agentSink("localhost",35853)表示通过通信协议获取5140端口数据,然后发送到localhost主机的35853端口。尾部(“/etc/services”)| agentSink("localhost",35853) 的意思是读取/etc/services文件夹下的文件,然后发送到localhost主机的35853端口。

Collector用于汇总多个Agent结果,并将汇总结果导入后端存储系统,如HDFS、HBase等。 Flume 附带了许多收集器实现collectorSource(35853) | console 表示将采集结果写入控制台。收藏家来源(35853) | CollectorSink("file:///tmp/flume/collected", "syslog") 表示将收集结果写入本地目录。收集器来源(35853) | CollectorSink("hdfs://namenode/user/flume/","syslog");表示将采集结果写入hdfs目录。

一般情况下,为了防止一台收集器故障而导致所有代理失效,可以将不同的代理连接到不同的收集器。同时,这可以保证不同数据类型的代理将数据放入同一个收集器中。

Master负责管理和协调Agent和Collector的配置信息,是Flume集群的控制器。它可以跟踪数据流的最终确认信息并通知代理。通常需要配置多个master来防止单点故障。使用zookeeper管理多个Master。

构建基于Flume的数据采集系统

Agent 和 Collector 都可以动态配置。可通过命令行或 Web 界面进行配置。命令行配置:在启动的master节点上依次输入“flume shell”→“connect localhost”。例如,执行 exec config a1 'tailDir("/data/logfile")' 'agentSink'。 Web界面:选择节点并填写源、宿等信息。

常见架构示例—拓扑1

agentA:尾部(“/ngnix/logs”)| agentSink(“收集器”,35853);

agentB:尾部(“/ngnix/logs”)| agentSink(“收集器”,35853);

agentC:尾部(“/ngnix/logs”)| agentSink("收集器",35853);

agentD : tail(“/ngnix/logs”) | agentSink("收集器",35853);

agentE:尾部(“/ngnix/logs”)| agentSink("收集器",35853);

agentF:尾部(“/ngnix/logs”)| agentSink("收集器",35853);收集器:收集器源(35853)| CollectorSink("hdfs://namenode/flume/","srcdata");

代理A:src | agentE2ESink("collectorA",35853);

代理B:src | agentE2ESink("collectorA",35853);

agentC:src | agentE2ESink("collectorB",35853);

代理D:src | agentE2ESink("collectorB",35853);

代理E:src | agentE2ESink("collectorC",35853);

agentF:src | agentE2ESink("collectorC",35853);

collectorA:collectorSource(35853)| CollectorSink("hdfs://...","src");

collectorB:collectorSource(35853)| CollectorSink("hdfs://...","src");

collectorC:collectorSource(35853)| CollectorSink("hdfs://...","src");

代理B:src | agentE2EChain("collectorA:35853","collectorC:35853");

agentC:src | agentE2EChain("collectorB:35853","collectorA:35853");

代理D:src | agentE2EChain("collectorB:35853","collectorC:35853");

代理E:src | agentE2EChain("collectorC:35853","collectorA:35853");

agentF:src | agentE2EChain("collectorC:35853","collectorB:35853");

collectorA:collectorSource(35853)| CollectorSink("hdfs://...","src");

collectorB:collectorSource(35853)| CollectorSink("hdfs://...","src");

collectorC:collectorSource(35853)| CollectorSink("hdfs://...","src");

传统数据库与Hadoop之间的数据同步

Sqoop:SQL-to-Hadoop,连接传统关系数据库和Hadoop的桥梁。关系数据库中的数据可以导入到Hadoop系统(例如HDFS HBase和Hive)中,也可以从Hadoop系统中提取数据并导出到关系数据库中。 MapReduce可以用来加速数据传输,实现数据传输的批处理。

Sqoop的优势在于资源的高效可控利用、任务并行、超时等。数据类型映射和转换可以自动化或由用户定制。同时支持MySQL、Oracle、PostgreSQL等多种数据库。

您可以使用sqoop命令生成map任务,将传统数据库中的数据传输到HDFS、Hbase或Hive。

Sqoop import 将数据从关系数据库导入到 Hadoop


第一步:Sqoop与数据库服务器通信,获取数据库表的元数据信息;步骤2:Sqoop启动MapOnly MR作业,并使用元数据信息并行将数据写入Hadoop。

Sqoop 导入用法:

sqoop导入\

--连接jdbc:mysql://www.sychzs.cn/sqoop \

--用户名sqoop\

--密码sqoop\

--表城市

--connnect:指定 JDBC URL

--用户名/密码:mysql数据库的用户名

--table:要读取的数据库表
bin/hadoop fs -cat cars/part-m-*

1,美国,帕洛阿尔托

2,捷克共和国,布尔诺

3,美国,桑尼维尔

导入到指定目录:

sqoop导入\

--连接jdbc:mysql://www.sychzs.cn/sqoop \

--用户名sqoop\

--密码sqoop\

--表城市\

--target-dir /etl/input/cities

导入字段国家值为USA:

sqoop导入\

--连接jdbc:mysql://www.sychzs.cn/sqoop \

--用户名sqoop\

--密码sqoop\

--表城市\

--其中“国家 = '美国'”

生成序列文件:

sqoop导入\

--连接jdbc:mysql://www.sychzs.cn/sqoop \

--用户名sqoop\

--密码sqoop\

--表城市\

--作为序列文件

指定地图数量:

sqoop导入\

--连接jdbc:mysql://www.sychzs.cn/sqoop \

--用户名sqoop\

--密码sqoop\

--表城市\

--num-mappers 10

Sqoop 导入—导入多个表:

sqoop导入\

--连接jdbc:mysql://www.sychzs.cn/sqoop \

--用户名sqoop\

--密码sqoop\

--查询'www.sychzs.cn,\

国家.国家,\

www.sychzs.cn \

来自普通城市\

使用(country_id)加入国家\

$条件在哪里'\

--按id分割\

--目标目录城市

Sqoop导入增量导入(一):

适用于每次向数据库追加数据,但现有数据不变,仅导入id列值大于1的记录的情况。

sqoop导入\

--连接jdbc:mysql://www.sychzs.cn/sqoop \

--用户名sqoop\

--密码sqoop\

--表访问\

--增量追加\

--检查列id \

--最后值1

Sqoop导入增量导入(2)

每次成功运行后,sqoop都会将最后一条记录的id值保存到metastore中以供下次使用。

sqoop 工作\

--创建访问\

--导入\

--连接jdbc:mysql://www.sychzs.cn/sqoop \

--用户名sqoop\

--密码sqoop\

--表访问\

--增量追加\

--检查列id \

--最后值0

运行 sqoop 作业:sqoop job --exec 访问

Sqoop导入增量导入(3)

数据库中有一列last_update_date,记录了最后一次修改时间。 Sqoop 仅在一定时间后将数据导入 Hadoop。
sqoop导入\

--连接jdbc:mysql://www.sychzs.cn/sqoop \

--用户名sqoop\

--密码sqoop\

--表访问\

--增量lastmodified\

--检查列last_update_date \

--最后值“2013-05-22 01:01:01”

Sqoop 导出 将数据从 Hadoop 导入关系数据库

第一步:Sqoop与数据库服务器通信,获取数据库表的元数据信息;步骤2:并行导入数据:将Hadoop上的文件分成若干个Split;每个分割均由映射任务导入。

如何使用Sqoop导出
sqoop导出\

--连接jdbc:mysql://www.sychzs.cn/sqoop \

--用户名sqoop\

--密码sqoop\

--表城市\

--export-dir 城市
--connnect:指定 JDBC URL

--用户名/密码:mysql数据库的用户名

--table:要导入的数据库表

export-dir:HDFS上的数据存储目录

Sqoop 导出—保证原子性:
sqoop 导出 \

--连接jdbc:mysql://www.sychzs.cn/sqoop \

--用户名sqoop\

--密码sqoop\

--表城市\

--staging-table staging_cities

Sqoop 导出—更新数据:

sqoop 导出\

--连接jdbc:mysql://www.sychzs.cn/sqoop \

--用户名sqoop\

--密码sqoop\

--表城市\

--update-key id sqoop 导出\

--连接jdbc:mysql://www.sychzs.cn/sqoop \

--用户名sqoop\

--密码sqoop\

--表城市\

--更新密钥id \

--更新模式允许插入

Sqoop 导出—选择性插入:
sqoop 导出\

--连接jdbc:mysql://www.sychzs.cn/sqoop \

--用户名sqoop\

--密码sqoop\

--表城市\

--列国家、城市

Sqoop与其他系统结合

     Sqoop可以与Oozie、Hive、Hbase等系统结合;  用户需要在www.sychzs.cn中添加HBASE_HOME、 HIVE_HOME等环境变量。
Sqoop与Hive结合:
sqoop import \
--连接jdbc:mysql://www.sychzs.cn/sqoop \
--用户名sqoop \
--密码sqoop \
--表城市\
--hive-import

Sqoop 与 HBase 结合:
sqoop import \
--连接jdbc:mysql://www.sychzs.cn/sqoop \
--用户名sqoop \
--密码sqoop \
--表城市\
--hbase-表城市 \
--栏目家族世界

以上介绍了数据采集系统和数据传输工具sqoop。 Hive和Pig相关的话题稍后会解释。 -->

相关文章

热门推荐