模块二:数据获取与处理

(一)任务一:数据获取与清洗

1.子任务一:数据获取 

编写agent文件power.conf,使用Flume采集无人机巡检数据power.txt,数据文件参考数据清洗部分;

目标数据源类型为HDFS

写入位置为hdfs上/source/logs/power/

答:

# Define the agent name  

agent.sources = r1  

agent.channels = c1  

agent.sinks = k1  

  

# Configure the source  

agent.sources.r1.type = exec  

agent.sources.r1.command = tail -F /root/eduhq/power.txt  

agent.sources.r1.channels = c1  

  

# Configure the channel  

agent.channels.c1.type = memory  

agent.channels.c1.capacity = 1000  

agent.channels.c1.transactionCapacity = 100  

  

# Configure the sink  

agent.sinks.k1.type = hdfs  

agent.sinks.k1.hdfs.path = hdfs://master:9000/source/logs/power/%Y-%m-%d/%H  

agent.sinks.k1.hdfs.fileType = DataStream  

agent.sinks.k1.hdfs.writeFormat = Text  

agent.sinks.k1.hdfs.batchSize = 10000  

agent.sinks.k1.hdfs.rollInterval = 3000  

agent.sinks.k1.hdfs.rollSize = 0  

agent.sinks.k1.hdfs.rollCount = 0  

agent.sinks.k1.hdfs.useLocalTimeStamp = true  

agent.sinks.k1.channel = c1  

  

# Bind the source and sink to the channel  

agent.sources.r1.channels = c1

 

bin/flume-ng agent -n agent -c conf -f conf/power.conf -Dflume.root.logger=INFO,console

2.子任务二:数据清洗

(1) 对/root/eduhq/目录下无人机巡检表power.txt

进行文本清洗,删除数据中第一行标题,避免在Hive导入时 报错,同时删除前两列脏数据,结果另存new_power.txt;

答:

#!/usr/bin/env python3  

  

# 定义输入和输出文件路径  

input_file = “/root/eduhq/power.txt”  

output_file = “/root/eduhq/new_power.txt”  

  

# 打开输入文件并读取内容  

with open(input_file, ‘r’) as file:  

    lines = file.readlines()  

  

# 跳过第一行(标题)并删除前两列  

cleaned_lines = [line.split()[2:] for line in lines[1:]]  

  

# 将清洗后的数据写入输出文件  

with open(output_file, ‘w’) as file:  

    for line in cleaned_lines:  

        file.write(‘\t’.join(line) + ‘\n’)

(2)对/root/eduhq/目录下巡查人员表power_people.txt进行文本清洗,删除数据中第一行标题,避免在Hive导入时报错,同时删除前两列脏数据,结果另保存为new_power_people.txt。

答:

#!/usr/bin/env python3  

  

# 定义输入和输出文件路径  

input_file = “/root/eduhq/power_people.txt”  

output_file = “/root/eduhq/new_power_people.txt”  

  

# 打开输入文件并读取内容  

with open(input_file, ‘r’) as file:  

    lines = file.readlines()  

  

# 跳过第一行(标题)并删除前两列  

cleaned_lines = [line.split()[2:] for line in lines[1:]]  

  

# 将清洗后的数据写入输出文件  

with open(output_file, ‘w’) as file:  

    for line in cleaned_lines:  

        file.write(‘\t’.join(line) + ‘\n’)

相关新闻

联系我们

027-87870986

在线咨询:点击这里给我发消息

邮件:931234110@qq.com

工作时间:周一至周五,9:30-18:30,节假日休息