前一阶段把集群搭建好,编写好FlumeNG采集数据到HDFS上以后,开始使用MapReduce对数据进行初步处理,处理分三个阶段
1.过滤掉无用的数据,像访问的静态资源、访问状态码非200的等。
2.基于第一步的结果进行日志增强,给每条记录添加SessionId,按访问时间排序后加上递增标号
3.初步统计访问起始时间、访问结束时间、进入页面、离开页面、一共访问了多少页面等数据
代码的整体结构
第一步过滤规整
1.实现第一步的代码WebLogPreProcess.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| package com.zonegood.hive.mr.pre;
import com.zonegood.hive.mrbean.WebLog; import com.zonegood.hive.util.ParseUtil; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException; import java.util.HashSet; import java.util.Set;
public class WebLogPreProcess {
static class WebLogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
Set<String> filter = new HashSet<String>(); Text k = new Text(); NullWritable v = NullWritable.get();
@Override protected void setup(Context context) throws IOException, InterruptedException { filter.add("/about"); filter.add("/black-ip-list/"); ... }
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); WebLog bean = ParseUtil.parse(line); ParseUtil.filter(bean,filter); if(bean.getInvalid()){ k.set(bean.toString()); context.write(k,v); } } } }
|
下面看一下ParseUtil类:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| package com.zonegood.hive.util;
import com.zonegood.hive.mrbean.WebLog;
import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Locale; import java.util.Set;
public class ParseUtil {
public static SimpleDateFormat df1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US); public static SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
public static WebLog parse(String line){ WebLog webLogBean = new WebLog(); String[] arr = line.split(" "); if(arr.length > 11){ webLogBean.setIp(arr[0]); webLogBean.setU_info(arr[1]); String time_local = formatDate(arr[3].substring(1)); if(null==time_local) time_local="-invalid_time-"; webLogBean.setTime(time_local); webLogBean.setRequest_method(arr[5]); webLogBean.setRequest_url(arr[6]); webLogBean.setStatus(arr[8]); webLogBean.setSent_body_bytes(arr[9]); webLogBean.setRequest_referer(arr[10]);
if (arr.length > 12) { StringBuilder sb = new StringBuilder(); for(int i=11;i<arr.length;i++){ sb.append(arr[i]); } webLogBean.setUser_agent(sb.toString()); } else { webLogBean.setUser_agent(arr[11]); }
if (Integer.parseInt(webLogBean.getStatus()) >= 400) { webLogBean.setInvalid(false); }
if("-invalid_time-".equals(webLogBean.getTime())){ webLogBean.setInvalid(false); } }else{ webLogBean.setInvalid(false); }
return webLogBean; }
public static void filter(WebLog bean,Set<String> filter){ if(!filter.contains(bean.getRequest_url())){ bean.setInvalid(false); } }
public static String formatDate(String time_local) { try { return df2.format(df1.parse(time_local)); } catch (ParseException e) { return null; } }
}
|
使用下面的shell脚本完成自动化部署
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| #!/bin/bash
# 作用:批处理weblog日志脚本 # 编写:赵一好
# java 环境 export JAVE_HOME=/data/jdk1.8.0_161 export JRE_HOME=${JAVE_HOME}/jre export CLASS_PATH=${JAVE_HOME}/lib/dt.jar:${JAVE_HOME}/lib/tools.jar export PATH=$PATH:${JAVE_HOME}/bin:${JRE_HOME}/bin
# hadoop 环境 export HADDOP_HOME=/data/hadoop-2.7.3 export PATH=$PATH:${HADDOP_HOME}/bin:${HADDOP_HOME}/sbin
# 时间 yesterday=`date -d'-1 day' +%Y-%m-%d` s_year=`date -d'-1 day' +%Y` s_month=`date -d'-1 day' +%m` s_day=`date -d'-1 day' +%d`
# inpath in_path=/syslog/preprocess/inpath
# outpath out_path=/syslog/preprocess/outpath
# jar name jar_name=weblog_pre_process.jar
# 判断制定目录是否存在数据 flies=`hdfs dfs -ls $in_path | grep $yesterday | wc -l` if [ $flies -gt 0 ];then echo "开始运行preprocess批处理命令,上传目录为:$out_path/$yesterday" hadoop jar $jar_name $in_path/$yesterday $out_path/$yesterday fi
# 如果报错发送邮件 if [ $? -gt 0 ];then echo "运行失败,发送邮件..." fi
|
第二步日志增强
日志增强由ClickStreamPageView.java完成,下面是核心代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233
| package com.zonegood.hive.mr;
import com.zonegood.hive.mrbean.WebLog; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException; import java.text.SimpleDateFormat; import java.util.*;
public class ClickStreamPageView {
static class ClickStreamPageViewMapper extends Mapper<LongWritable, Text, Text, WebLog> {
WebLog v = new WebLog(); Text k = new Text();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\001"); if(fields.length >= 10 && "true".equals(fields[0])){ v.setIp(fields[1]); v.setU_info(fields[2]); v.setTime(fields[3]); v.setRequest_url(fields[5]); v.setStatus(fields[6]); v.setSent_body_bytes(fields[7]); v.setRequest_referer(fields[8]); v.setUser_agent(fields[9]); k.set(v.getIp()); context.write(k,v); }
}
static class ClickStreamPageViewReducer extends Reducer<Text, WebLog, NullWritable, Text> {
public static SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
NullWritable k = NullWritable.get(); Text v = new Text();
@Override protected void reduce(Text key, Iterable<WebLog> values, Context context) throws IOException, InterruptedException { ArrayList<WebLog> beans = new ArrayList<WebLog>(); try{ for (WebLog webLog:values) { WebLog pv = new WebLog(); try { BeanUtils.copyProperties(pv,webLog); } catch (Exception e) { e.printStackTrace(); }
beans.add(pv); } Collections.sort(beans, new Comparator<WebLog>() { @Override public int compare(WebLog o1, WebLog o2) { try { Date d1 = formatDate(o1.getTime()); Date d2 = formatDate(o2.getTime()); if(d1 == null || d2 == null) return 0; return d1.compareTo(d2); } catch (Exception e) { return 0; } } });
int step = 1; String sessionId = UUID.randomUUID().toString(); for (int i=0 ; i<beans.size() ; i++) { if(beans.size() == 1){ v.set(sessionId+"\001"+ beans.get(i).getIp()+"\001"+ step + "\001" + (60) + "\001" + beans.get(i).getU_info() + "\001" + beans.get(i).getTime() + "\001" + beans.get(i).getRequest_url() + "\001" + beans.get(i).getRequest_referer() + "\001"+ beans.get(i).getUser_agent() + "\001" + beans.get(i).getSent_body_bytes() + "\001" + beans.get(i).getStatus()); context.write(k,v); break; }
if(i == 0){ continue; }
long diffTime = 0; try { diffTime = diffTime(beans.get(i).getTime(),beans.get(i-1).getTime()); } catch (Exception e) {
} if(diffTime > 30 * 60 * 1000){ v.set(sessionId+"\001"+ beans.get(i -1).getIp()+"\001"+ step + "\001" + (diffTime/1000) + "\001" + beans.get(i-1).getU_info() + "\001" + beans.get(i-1).getTime() + "\001" + beans.get(i-1).getRequest_url() + "\001" + beans.get(i-1).getRequest_referer() + "\001"+ beans.get(i-1).getUser_agent() + "\001" + beans.get(i-1).getSent_body_bytes() + "\001" + beans.get(i-1).getStatus()); context.write(k,v); sessionId = UUID.randomUUID().toString(); step = 1; }else{ v.set(sessionId+"\001"+ beans.get(i -1).getIp()+"\001"+ step + "\001" + (diffTime/1000) + "\001" + beans.get(i-1).getU_info() + "\001" + beans.get(i-1).getTime() + "\001" + beans.get(i-1).getRequest_url() + "\001" + beans.get(i-1).getRequest_referer() + "\001"+ beans.get(i-1).getUser_agent() + "\001" + beans.get(i-1).getSent_body_bytes() + "\001" + beans.get(i-1).getStatus()); context.write(k,v); step ++; }
if(i == beans.size()-1){ v.set(sessionId+"\001"+ beans.get(i).getIp()+"\001"+ step + "\001" + (60) + "\001" + beans.get(i).getU_info() + "\001" + beans.get(i).getTime() + "\001" + beans.get(i).getRequest_url() + "\001" + beans.get(i).getRequest_referer() + "\001"+ beans.get(i).getUser_agent() + "\001" + beans.get(i).getSent_body_bytes() + "\001" + beans.get(i).getStatus()); context.write(k,v); } }
}catch (Exception e){ e.printStackTrace(); }
}
private Date formatDate(String timeStr) throws Exception{ return df2.parse(timeStr); }
private long diffTime(String t1,String t2) throws Exception{ return df2.parse(t1).getTime() - df2.parse(t2).getTime(); } } } public static void main(String[] args) throws Exception {
Configuration conf = new Configuration(); Job job = Job.getInstance(conf);
job.setJarByClass(ClickStreamPageView.class);
job.setMapperClass(ClickStreamPageViewMapper.class); job.setReducerClass(ClickStreamPageViewMapper.ClickStreamPageViewReducer.class);
job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(WebLog.class);
job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(Text.class);
FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); job.waitForCompletion(true);
} }
|
自动化运行脚本click_stream_page_view.sh
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| #!/bin/bash
# 作用:统计PageView的批处理脚本 # 作者:赵一好
# java 环境 export JAVE_HOME=/data/jdk1.8.0_161 export JRE_HOME=${JAVE_HOME}/jre export CLASS_PATH=${JAVE_HOME}/lib/dt.jar:${JAVE_HOME}/lib/tools.jar export PATH=$PATH:${JAVE_HOME}/bin:${JRE_HOME}/bin
# hadoop 环境 export HADDOP_HOME=/data/hadoop-2.7.3 export PATH=$PATH:${HADDOP_HOME}/bin:${HADDOP_HOME}/sbin
# 日期 yesterday=`date -d'-1 day' +%Y-%m-%d` s_year=`date -d'-1 day' +%Y` s_month=`date -d'-1 day' +%m` s_day=`date -d'-1 day' +%d`
# inpath in_path=/syslog/preprocess/outpath
# outpath out_path=/syslog/pageview/outpath
# jar name jar_name=click_stream_page_view.jar
# 检测是否存在待处理数据 files=`hdfs dfs -ls $in_path | grep $yesterday | wc -l` if [ $files -gt 0 ];then echo "开始运行PageView批处理命令,上传目录为:$out_path/$yesterday" hadoop jar $jar_name $in_path/$yesterday $out_path/$yesterday fi
# 如果报错发送邮件 if [ $? -gt 0 ];then echo "运行失败,发送邮件..." fi
|
第三步生成部分指标
主要由ClickStreamVisit.java完成,下面是核心代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
| package com.zonegood.hive.mr;
import com.zonegood.hive.mrbean.PageView; import com.zonegood.hive.mrbean.VisitBean; import org.apache.commons.beanutils.BeanUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator;
public class ClickStreamVisit {
static class ClickStreamVisitMapper extends Mapper<LongWritable, Text, Text, PageView> {
Text k = new Text(); PageView v = new PageView();
@Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] fields = value.toString().split("\001"); v.setSessionId(fields[0]); v.setIp(fields[1]); v.setStep(fields[2]); v.setStayTime(fields[3]); v.setU_info(fields[4]); v.setTime(fields[5]); v.setRequest_url(fields[6]); v.setRequest_referer(fields[7]); v.setUser_agent(fields[8]); v.setSent_body_bytes(fields[9]); v.setStatus(fields[10]); k.set(v.getSessionId()); context.write(k,v);
} }
static class ClickStreamVisitReducer extends Reducer<Text, PageView, NullWritable, VisitBean> {
NullWritable k = NullWritable.get(); Text v = new Text();
@Override protected void reduce(Text key, Iterable<PageView> values, Context context) throws IOException, InterruptedException {
ArrayList<PageView> beans = new ArrayList<PageView>();
for (PageView pv : values) { PageView bean = new PageView(); try { BeanUtils.copyProperties(bean, pv); } catch (Exception e) { e.printStackTrace(); } beans.add(bean); }
Collections.sort(beans, new Comparator<PageView>() { @Override public int compare(PageView o1, PageView o2) { return Integer.parseInt(o1.getStep()) > Integer.parseInt(o2.getStep()) ? 1 : -1; } });
VisitBean visitBean = new VisitBean(); visitBean.setInPage(beans.get(0).getRequest_url()); visitBean.setOutPage(beans.get(beans.size()-1).getRequest_url()); visitBean.setInTime(beans.get(0).getTime()); visitBean.setOutTime(beans.get(beans.size()-1).getTime()); visitBean.setPageVisits(beans.size()); visitBean.setRemote_addr(beans.get(0).getIp()); visitBean.setSession(beans.get(0).getSessionId()); visitBean.setReferal(beans.get(0).getRequest_referer()); context.write(k,visitBean);
} }
public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(ClickStreamVisit.class); job.setMapperClass(ClickStreamVisitMapper.class); job.setReducerClass(ClickStreamVisitReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(PageView.class); job.setOutputKeyClass(NullWritable.class); job.setOutputValueClass(VisitBean.class); FileInputFormat.setInputPaths(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); boolean res = job.waitForCompletion(true); System.exit(res?0:1);
} }
|
下面是配合MR自动脚本click_stream_visit.sh
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| #!/bin/bash
# 作用:统计Visit的批处理脚本 # 作者:赵一好
# java 环境 export JAVE_HOME=/data/jdk1.8.0_161 export JRE_HOME=${JAVE_HOME}/jre export CLASS_PATH=${JAVE_HOME}/lib/dt.jar:${JAVE_HOME}/lib/tools.jar export PATH=$PATH:${JAVE_HOME}/bin:${JRE_HOME}/bin
# hadoop 环境 export HADDOP_HOME=/data/hadoop-2.7.3 export PATH=$PATH:${HADDOP_HOME}/bin:${HADDOP_HOME}/sbin
# 日期 yesterday=`date -d'-1 day' +%Y-%m-%d` s_year=`date -d'-1 day' +%Y` s_month=`date -d'-1 day' +%m` s_day=`date -d'-1 day' +%d`
# inpath in_path=/syslog/pageview/outpath
# outpath out_path=/syslog/visit/outpath
# jar name jar_name=click_stream_visit.jar
# 检测是否存在待处理数据 files=`hdfs dfs -ls $in_path | grep $yesterday | wc -l` if [ $files -gt 0 ];then echo "开始运行Visit批处理命令,上传目录为:$out_path/$yesterday" hadoop jar $jar_name $in_path/$yesterday $out_path/$yesterday fi
# 如果报错发送邮件 if [ $? -gt 0 ];then echo "运行失败,发送邮件..." fi
|
自动化
使用部署好的Azkaban调度系统调度三个shell脚本,完成自动ETL步骤。
1.编写auto_run.sh脚本
1 2 3 4 5 6 7 8
| #!/bin/bash
# 作用:自动运行preprocess pageview visit 批处理脚本
sh weblog_pre_process.sh sh click_stream_page_view.sh sh click_stream_visit.sh
|
2.将三个MR程序分别打包成可执行ja,可以使用Maven package指令,这边不过多详述
click_stream_page_view.jar
click_stream_visit.jar
weblog_pre_process.jar
3.编写azkaban的job并打包成zip上传到Azkaban平台
1 2 3
| # foo.job type=command command=sh auto_run.sh
|
foo.zip中一定要包含上述的所有文件,如下图所示
运行foo任务,如果是绿条证明程序没问题
具体操作有点繁琐,之前blog有详述,这边就略过了