关于MapReduce
MapReduce作业时客户端需要执行的一个工作单元:它包括输入数据、MapReduce程序和配置信息。Hadoop将作业分成若干个小任务来执行,其中包括两类任务:map任务和Reduce任务。
有两类节点控制着作业执行过程:一个jobtracker 及一系列tasktracker。Jobtracker 通过调度tasktracker上运行的任务,来协调所有运行在系统上的作业。tasktracker在运行任务的同时将进度报告给jobtracker,jobtracker由此记录每项作业任务的整体进度情况。如果其中一个任务失败,jobtracker可以在另外一个tasktracker节点上重新调度该任务。
Hadoop将MapReduce的输入数据划分成等长的小数据块,称为输入分片或者简称分片。Hadoop为每个分片构建一个map任务,并由该任务来运行用户自定义的map函数,从而处理分片中的每条记录。对于大多数作业来说,一个合理的分片大小趋向于HDFS的一个块大小,默认是64MB,不过可以根据具体场景针对集群调整这个默认值,在新建所有文件或新建每个文件时具体指定即可。可以尽量避免如果分片跨两个数据块,那么对于任何一个HDFS节点,基本上都不可能同时存储这两个数据块,因此分片中的部分数据需要通过网络传输到map任务节点。与使用本地数据运行整个map任务相比,显然这种效率更低。
Hadoop在存储有输入数据(HDFS中的数据)的节点上运行map任务,可以获得最佳性能。这就是所谓的数据本地化优化。
map任务将其输出写入本地硬盘,而非HDFS。这是为什么? 因为map 的输出是中间结果:该中间结果由reduce任务处理后才产生最终输出结果,而且一旦作业完成,map的输出结果可以被删除。因此把它存储在HDFS 中实现备份,难免有些小题大做。如果该节点上运行的map任务 在将map中间结果传送给reduce任务前失败,Hadoop将在另一个节点重新运行这个map任务以再次构建map中间结果。
reducer任务比不具有本地化的优势——单个reduce任务的输入通常来自于所有mapper的输出。(我的理解是:一个或者N个mapper都可能产生0个或多个reducer输出,如果是多个,每个mapper将会对其输出进行分区处理。)
集群上的可用带宽限制了MapReduce作业的数量,因此最重要的一点是尽量避免map任务和reduce任务之间的数据传输。
互联网有其自身发展逻辑,理解了其中规律,可以有效把握创新的方向,对随机性创新有结构性分析和认知,清楚其发展方向及不足。通过对互联网应用的梳理和讨论,展示其中部分发展逻辑。
先说个常识
在新浪微博上,每秒钟都有上万条微博发布,一些微博被评论转发,出现在众多不知名的网民前。我们的城市也是如此,几十上百万的市民一早离家奔走于城市各个角落,开始一天的生活,傍晚他们再次穿越城市,回到家中。城市庞大的交通网络支撑着人员频繁流动,维持着城市繁荣,社交网络天量级的数据传播更加少不了高效的信息传播体系,FB新浪微博作为社交网络犹如靓丽的城市被我们所熟知,信息传播体系与城市交通运输体系一样默默支撑着社交网络和城市的发展壮大。信息传播体系与社交网络是两个完全不同的概念,将两者相混淆是很多人的误解。
我们认为互联网的发展逻辑也如色彩的三原色一样,通过三个基本模块的演绎,就可以描述大部分的互联网应用。这三个模块是信息生产,信息传播,信息接收,我们先对这三模块进行简单讨论,再对具体的案例进行解读。
基本模块
首先是显示类,信息如何显示在我们面前,是我们对其进行传播、加工的第一步。此类创新又可以分为软件和硬件两个方面,软件指内容编排方式的创新,比如flipboard杂志,pinterest瀑布流,win8格子,根据图片逻辑显示的织图,3D,橱窗浏览,无限缩放,多层页面展示等,硬件方面包括分辨率,尺寸,曲面屏幕,多屏显示,投影,穿戴装备等。
我们接触到的信息形式大致为文字,声音,图像,视频,信息生产则指产生上述内容的应用。文字方面有输入法,网络流行语,长微博,设计诗,第三方评论等,声音方面有微信,啪啪,唱吧,声音微博等,图像则有相机应用及图片处理应用,instagram,美图秀秀,贴纸表情等,视频有自拍,vine比较少。这是软件方面,硬件则有体感、传感器和智能手机,google glass ,lumia,leap motion,海淀桥路况,铛!铛!铛!。
至此,我们已经可以进行有章法的创新,比如显示方面,加入时间线,不同的时间点显示不同的预制模板,以满足不同的情境下,不同内容展现。文字方面是排版的创新,文本模块化,考虑评论内容,更便利的图文,gif,声音视频混排,以满足当下的快速、互动的阅读需求。图片动作化,视频图像化,视频加工凸显具体物体的运动,以根据视频特点满足不同的信息需求。能够降低网民信息生产门槛的创新,基本上能符合当下网民需求大方向,不会导致方向性的错误。
信息传播体系是当下阶段创新的核心。信息传播的方式有很多种,比如一对多,微博上的博主与粉丝关系,微信上的公共帐号,微博的转发网络,关键字传播,这是一对未知的多,如果是一对多个熟人,则有朋友圈、微博群组等,一对一有私信、微信、@、陌陌等,一对未知的一,有定向发布。多种传播方式构成一个信息传播体系,维持信息的有效传播。恰如城市里的交通系统有地铁、公交、出租车、私家车等不同的运输方式满足不同的流动需求。
最后是信息接收模块,信息在微博、微信、地图、浏览器、淘宝、app市场上被我们看到,这也是入口之争。不过我们知道信息是在传播渠道终端出现,传播渠道在哪里,信息就会出现在哪里,所以入口之争的本义应该是对传播渠道的占有,在社会化媒体阶段,渠道是动态的,不可控的,发出一条微博你不知道谁会转发,这条微博会被传播到哪些人群中,传播渠道的动态化,导致入口的多变,现有的入口之争思维方式是存在问题的。个体信息处理中心将会是信息接收模块的接口,搜狐浏览器,网易类的新闻终端有这样的雏形。在硬件方面,就是信息显示在何种终端上,智能手机,智能家电,汽车,可穿戴硬件,体感设备等,这就将讨论延伸到移动互联网和物联网上了。互联网的发展逻辑是连贯的。
应用的分解
我们可以用这三个基本模块搭积木一样对互联网应用进行解读。
成功的创新是能够满足用户的某种需求,在当下,用户既是生产者,使用者,也是传播者,比如说拍张照片,签个到,他们发出了照片和签到信息,同时也将照片和签到信息传播给了自己的好友粉丝,这里就有两种理解,我把照片分享给好友,我们进行了沟通互动,这是社交活动;另一种是我用照片建立了一个传播网络,可能只有几个朋友刚好看到了,看到的也未必都与我就这照片进行互动,但通过这样的传播体系,我有效的将照片传播给特定人群。
我们可以看下美图秀秀和街旁,首先他们是否能让用户便利的拍照,对照片进行修饰,很容易的生产信息,街旁能否让人快捷的签到及标签情境,这个问题解决了,就根据所满足的需求选择不同的传播渠道,照片分享在不同的需求情境下代表不同的意义,可能是记录,可能是炫耀,可能是沟通,可能是搞怪等很多种,这就在传播途径上有不同的选择,对分享的群组进行归类,以进行定向分享,让照片在熟人圈中传播,或家人间传播,对照片进行标签,根据标签来分享等,来满足不同的需求。街旁同样如此,除了根据用户的不同设置不同的传播通道外,分享的另一面是信息的获取,我们利用标签将自己的内容传播出去,同时自己感兴趣的内容也能便捷的接收到,当我标签女装店时,我进入的是一个以女装为标签的传播网络,在这个网络里,我能与之相关的人群建立联系,直接查看女装店的图片、评价、优惠信息,或询问店主具体地址。
我也设想了这样一个场景,早上地铁或公交上,打开一款app,输入自己吃的早餐,面包、馒头还是面条,这样就建立了一个以“早餐吃过什么”为主题的传播通道,在该通道内收到早上的趣闻乐事,或者统计一车厢一公里范围内的人数,举行面包、馒头、面条游戏战等各种互动。这算是个模板,可以举一反三推演。
我们再把积木做些延伸,前面讨论过信息发布和接收的硬件都可通过智能终端,比如手机来实现,我们的讨论可以平移进移动互联网领域。PC互联网与移动互联网本质上是一样的,其同质性远大于所宣称的差异。讨论移动互联网要说O2O,我们将其分为Offline 2 Online和Online 2 Offline两部分。线下往互联网迁移在于通过互联网能够获得更大传播的影响力,以及能够为用户提供更好的消费体验,互联网往线下转移在于能够找到在合适的时间和场所来匹配自身的需求。要做O2O,首先要考虑的线下与互联网上两群人是否由于时间和场所这两个原因造成错失交易机会,比如出租车,场所和技师的服务时间,如是,做O2O则有利于两者的信息沟通,创造交易机会,另一种是两者沟通达成更好的消费体验,查看场所具体情形,预约,选座点单等。当确认了目标用户的需求后,就是将Offline 2 Online的过程,你可以用手机,思维开阔些的可以用智能终端,传感器类的,简化Offline的信息生产步骤,快速上线,接下去是传播体系的选择,如何将Offline与Online的信息进行匹配,不同的需求需要采用不同的传播途径,微信是一个选择,标签化传播通道是另一个,一对多的广播传播也算一个。这是O2O的大致框架。O2O如果深究起来,算是微博上的细分渠道。
O2O与电子商务关系密切,接着谈淘宝。一个交易的达成,首先是大家能够遇到一起,也就是交易机会,第2步是达成商品价值价格上的一致,最后成交交钱拿货。淘宝的价值是提供交易机会,给店家带来流量,撮合店家与消费者相互协商。而这样提供交易机会的能力,淘宝正在消失。在社会化媒体阶段,消费者很容易的建立以自身需求的传播通道,比如建立运动鞋的标签,那么通过这一标签,我可以得到有关运动鞋的大量资讯,包括数量众多的卖家。不仅如此,选定大致几种款式,再次建立以其为标签的传播通道,可以很容易收集到几种款式的价值评价,可以对运动鞋有更准确的判断。相关传播通道所提供的信息根据用户具体需求情境,需求的不同,更为详尽准确和个性化。其次,供应商与消费者关系的演变,C2B形式的扩展,对现有店家也是一大冲击。淘宝维系其商业生态的基础及能力逐步被瓦解,淘宝的颓势将无可避免。
说说微博与微信。微博有完整的信息传播体系,一对一、一对多、关键字、转发等,微信则有一对一、一对多的公共帐号,前面打过比方,微博就如城市的交通系统,微信则是出租车,两者的信息传播效率不再一个档次上。微信的一对一,就如出租车一样点对点,直接有效,一对多的公共账号有如出租车拼车,有我需要的内容,但我
一个很大的文件,例如10G,仅包含ip地址和访问时间二列,格式如下:
127.0.0.1 2013-07-22 14:00 127.0.0.1 2013-07-22 14:02 127.0.0.1 2013-07-22 14:03 127.0.0.3 2013-07-22 14:03 127.0.0.1 2013-07-22 14:04 127.0.0.1 2013-07-22 14:05 127.0.0.1 2013-07-22 14:06 127.0.0.1 2013-07-22 14:07 127.0.0.1 2013-07-22 14:08 127.0.0.1 2013-07-22 14:09 127.0.0.1 2013-07-22 14:10 127.0.0.1 2013-07-22 14:11 127.0.0.1 2013-07-22 14:12 127.0.0.1 2013-07-22 14:13 127.0.0.4 2013-07-22 14:13 127.0.0.1 2013-07-22 14:15 127.0.0.1 2013-07-22 14:16 127.0.0.4 2013-07-22 14:17 ... ...
从文件里查出在5分钟内连续登陆10次以上的ip地址集合并输出。这类问题是一个很常见的应用,通常都是从大的log日志文件中找出有攻击嫌疑的ip。
这类应用因为要处理分析的文件非常大,显然不能将整个文件全部读入内存,然后进行下一步工作。常见的比较成熟的解决方案有:分治+Hash,Bloom filter,2-Bitmap等。可参考http://blog.csdn.net/v_JULY_v/article/details/6279498
这里就使用第一种方式来解决。
下面是分治与hash的代码
public class DuplicateIP { private String delimiter = " "; private String FILE_PRE = "ip_"; private int MAGIC = 10,BATCH_MAGIC = 500; private String root = "/DuplicateIP/"; private String filename = ""; public DuplicateIP(final String filename) { this.filename = filename; } /** * 将大文件拆分成较小的文件,进行预处理 * @throws IOException */ private void preProcess() throws IOException { //Path newfile = FileSystems.getDefault().getPath(filename); BufferedInputStream fis = new BufferedInputStream(new FileInputStream(new File(filename))); // 用5M的缓冲读取文本文件 BufferedReader reader = new BufferedReader(new InputStreamReader(fis,"utf-8"),5*1024*1024); //假设文件是10G,那么先根据hashcode拆成小文件,再进行读写判断 //如果不拆分文件,将ip地址当做key,访问时间当做value存到hashmap时, //当来访的ip地址足够多的情况下,内存开销吃不消 // List<Entity> entities = new ArrayList<Entity>(); //存放ip的hashcode->accessTimes集合 Map<String,List<String>> hashcodeMap = new HashMap<String,List<String>>(); String line = ""; int count = 0; while((line = reader.readLine()) != null){ String split[] = line.split(delimiter); if(split != null && split.length >= 2){ //根据ip的hashcode这样拆分文件,拆分后的文件大小在1G上下波动 //极端情况是整个文件的ip地址全都相同,只有一个,那么拆分后还是只有一个文件 int serial = split[0].trim().hashCode() % MAGIC; String splitFilename = FILE_PRE + serial; List<String> lines = hashcodeMap.get(splitFilename); if(lines == null){ lines = new ArrayList<String>(); hashcodeMap.put(splitFilename, lines); } lines.add(line); } count ++; if(count > 0 && count % BATCH_MAGIC == 0){ for(Map.Entry<String, List<String>> entry : hashcodeMap.entrySet()){ //System.out.println(entry.getKey()+"--->"+entry.getValue()); DuplicateUtils.appendFile(root + entry.getKey(), entry.getValue(), Charset.forName("UTF-8")); } //一次操作500之后清空,重新执行 hashcodeMap.clear(); } } reader.close(); fis.close(); } private boolean process() throws IOException{ Path target = Paths.get(root); //ip -> List<Date> Map<String,List<Date>> resMap = new HashMap<String,List<Date>>(); this.recurseFile(target,resMap); for(Map.Entry<String, List<Date>> entry : resMap.entrySet()){ System.out.println(entry.getKey()); for(Date date : entry.getValue()){ System.out.println(date); } } return true; } /** * 递归执行,将5分钟内访问超过阈值的ip找出来 * * @param parent * @return * @throws IOException */ private void recurseFile(Path parent,Map<String,List<Date>> resMap) throws IOException{ //Path target = Paths.get(dir); if(!Files.exists(parent) || !Files.isDirectory(parent)){ return; } Iterator<Path> targets = parent.iterator(); for(;targets.hasNext();){ Path path = targets.next(); if(Files.isDirectory(parent)){ //如果还是目录,递归 recurseFile(path.toAbsolutePath(),resMap); }else { //将一个文件中所有的行读上来 List<String> lines = Files.readAllLines(path, Charset.forName("UTF-8")); judgeAndcollection(lines,resMap); } } } /** * 根据从较小文件读上来的每行ip accessTimes进行判断符合条件的ip * 并放入resMap * * @param lines * @param resMap */ private void judgeAndcollection(List<String> lines,Map<String,List<Date>> resMap) { if(lines != null){ //ip->List<String>accessTimes Map<String,List<String>> judgeMap = new HashMap<String,List<String>>(); for(String line : lines){ line = line.trim(); int space = line.indexOf(delimiter); String ip = line.substring(0, space); List<String> accessTimes = judgeMap.get(ip); if(accessTimes == null){ accessTimes = new ArrayList<String>(); } accessTimes.add(line.substring(space + 1).trim()); judgeMap.put(ip, accessTimes); } if(judgeMap.size() == 0){ return; } for(Map.Entry<String, List<String>> entry : judgeMap.entrySet()){ List<String> acessTimes = entry.getValue(); //相同ip,先判断整体大于10个 if(acessTimes != null && acessTimes.size() >= MAGIC){ //开始判断在List集合中,5分钟内访问超过MAGIC=10 List<Date> attackTimes = DuplicateUtils.attackList(acessTimes, 5 * 60 * 1000, MAGIC); if(attackTimes != null){ resMap.put(entry.getKey(), attackTimes); } } } } } /** * @param args */ public static void main(String[] args) { String filename = "/DuplicateIP/log.txt"; DuplicateIP dip = new DuplicateIP(filename); try { dip.preProcess(); dip.process(); } catch (IOException e) { e.printStackTrace(); } } }
下面是工具类,提供了一些文件读写及查找的功能
public class DuplicateUtils { /** * 根据给出的数据,往给定的文件形参中追加一行或者几行数据 * * @param file * @throws IOException */ public static Path appendFile(String splitFilename, Iterable<? extends CharSequence> accessTimes,Charset cs) throws IOException { if(accessTimes != null){ Path target = Paths.get(splitFilename); if(target == null){ createFile(splitFilename); } return Files.write(target, accessTimes, cs);//, options) } return null; } /** * 创建文件 * @throws IOException */ public static void createFile(String splitFilename) throws IOException { Path target = Paths.get(splitFilename); Set<PosixFilePermission> perms = PosixFilePermissions.fromString("rw-rw-rw-"); FileAttribute<Set<PosixFilePermission>> attr = PosixFilePermissions.asFileAttribute(perms); Files.createFile(target, attr); } public static Date stringToDate(String dateStr,String dateStyle){ if(dateStr == null || "".equals(dateStr)) return null; DateFormat format = new SimpleDateFormat(dateStyle);//"yyyy-MM-dd hh:mm:ss"); try { return format.parse(dateStr); } catch (ParseException e) { e.printStackTrace(); return null; } } public static String dateToString(Date date,String dateStyle){ if(date == null) return null; DateFormat format = new SimpleDateFormat(dateStyle); return format.format(date); } /** * 根据间隔时间,判断列表中的数据是否已经大于magic给出的魔法数 * 返回true or false * * @param dates * @param intervalDate * @param magic * @return * @throws ParseException */ public static boolean attack(List<String> dateStrs,long intervalDate,int magic) { if(dateStrs == null || dateStrs.size() < magic){ return false; } List<Date> dates = new ArrayList<Date>(); for(String date : dateStrs){ if(date != null && !"".equals(date)) dates.add(stringToDate(date,"yyyy-MM-dd hh:mm:ss")); } Collections.sort(dates); return judgeAttack(dates,intervalDate,magic); } public static boolean judgeAttack(List<Date> sequenceDates,long intervalDate,int magic){ if(sequenceDates == null || sequenceDates.size() < magic){ return false; } for(int x = 0; x < sequenceDates.size() && x <= sequenceDates.size() - magic;x++){ Date dateAfter5 = new Date(sequenceDates.get(x).getTime() + intervalDate); int count = 1; for(int i = x + 1;i< sequenceDates.size();i++){ Date compareDate = sequenceDates.get(i); if(compareDate.before(dateAfter5)) count ++ ; else break; } if(count >= magic) return true; } return false; } /** * 判断在间隔时间内,是否有大于magic的上限的数据集合, * 如果有,则返回满足条件的集合 * 如果找不到满足条件的,就返回null * * @param sequenceDates 已经按照时间顺序排序了的数组 * @param intervalDate * @param magic * @return */ public static List<Date> attackTimes(List<Date> sequenceDates,long intervalDate,int magic){ if(se