模块二:数据获取与处理
(一)任务一:数据获取与清洗
1.子任务一:数据获取
编写agent文件power.conf,使用Flume采集无人机巡检数据power.txt,数据文件参考数据清洗部分;
目标数据源类型为HDFS
写入位置为hdfs上/source/logs/power/
答:
# Define the agent name
agent.sources = r1
agent.channels = c1
agent.sinks = k1
# Configure the source
agent.sources.r1.type = exec
agent.sources.r1.command = tail -F /root/eduhq/power.txt
agent.sources.r1.channels = c1
# Configure the channel
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000
agent.channels.c1.transactionCapacity = 100
# Configure the sink
agent.sinks.k1.type = hdfs
agent.sinks.k1.hdfs.path = hdfs://master:9000/source/logs/power/%Y-%m-%d/%H
agent.sinks.k1.hdfs.fileType = DataStream
agent.sinks.k1.hdfs.writeFormat = Text
agent.sinks.k1.hdfs.batchSize = 10000
agent.sinks.k1.hdfs.rollInterval = 3000
agent.sinks.k1.hdfs.rollSize = 0
agent.sinks.k1.hdfs.rollCount = 0
agent.sinks.k1.hdfs.useLocalTimeStamp = true
agent.sinks.k1.channel = c1
# Bind the source and sink to the channel
agent.sources.r1.channels = c1
bin/flume-ng agent -n agent -c conf -f conf/power.conf -Dflume.root.logger=INFO,console
2.子任务二:数据清洗
(1) 对/root/eduhq/目录下无人机巡检表power.txt
进行文本清洗,删除数据中第一行标题,避免在Hive导入时 报错,同时删除前两列脏数据,结果另存new_power.txt;
答:
#!/usr/bin/env python3
# 定义输入和输出文件路径
input_file = “/root/eduhq/power.txt”
output_file = “/root/eduhq/new_power.txt”
# 打开输入文件并读取内容
with open(input_file, ‘r’) as file:
lines = file.readlines()
# 跳过第一行(标题)并删除前两列
cleaned_lines = [line.split()[2:] for line in lines[1:]]
# 将清洗后的数据写入输出文件
with open(output_file, ‘w’) as file:
for line in cleaned_lines:
file.write(‘\t’.join(line) + ‘\n’)
(2)对/root/eduhq/目录下巡查人员表power_people.txt进行文本清洗,删除数据中第一行标题,避免在Hive导入时报错,同时删除前两列脏数据,结果另保存为new_power_people.txt。
答:
#!/usr/bin/env python3
# 定义输入和输出文件路径
input_file = “/root/eduhq/power_people.txt”
output_file = “/root/eduhq/new_power_people.txt”
# 打开输入文件并读取内容
with open(input_file, ‘r’) as file:
lines = file.readlines()
# 跳过第一行(标题)并删除前两列
cleaned_lines = [line.split()[2:] for line in lines[1:]]
# 将清洗后的数据写入输出文件
with open(output_file, ‘w’) as file:
for line in cleaned_lines:
file.write(‘\t’.join(line) + ‘\n’)