当前位置:  互联网>综合
本页文章导读:
    ▪单机对大数据的排序处理      引用请声明原文:http://blog.csdn.net/duck_genuine/article/details/9155705 由于引用数据以hash的方式放在不同的文件里需要将其合并排序写到一个文件。数据量暂时是有几千万级别。 文件的每行是一条jso.........
    ▪数据挖掘之聚类      聚类属于无监督学习。 聚类的算法有很多种,其可分为基于划分、层次、密度、网格及模型的聚类方法。 根据数据集的不同,需要采用不同的聚类算法和策略。 1. 选择聚类算法,所面.........
    ▪编程珠玑---读书笔记---第11章排序      插入排序: 对于小型的排序任务速度很快,它是稳定的,只需要O(1)的额外空间,基于比较和交换的次数为O(n^2)。 #include <iostream> #include <string> #include <cstdio> #include <cmath> #incl.........

[1]单机对大数据的排序处理
    来源: 互联网  发布时间: 2013-10-25

引用请声明原文:http://blog.csdn.net/duck_genuine/article/details/9155705

由于引用数据以hash的方式放在不同的文件里需要将其合并排序写到一个文件。数据量暂时是有几千万级别。

文件的每行是一条json格式的记录,格式如下:

 { "_id" : { "$oid" : "51ace243bb15094b6c40ada5" }, "count" :1, "domain" : "qq.com", "vid" : "92312592" }


将这个过程的实现做成一个类似MapReduce的过程:

mapSplit--->shuffler-->sort-->meger


然后将处理好的文件加载到内存,放到list集合中,再使用二分查找方式,不使用map,是因为map占用的内存比实际数据更大。


  • 先看一下程序的入口代码



package com.wole.indexing.refer;

import java.io.File;
import java.io.IOException;
import java.util.Collection;

import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * @author yzygenuine
 * 
 *         整合map,shuffler,merge程序,定时将数据处理写到指定文件中
 */
public class MapReduceDriver {

	private static final Logger logger = LoggerFactory.getLogger(MapReduceDriver.class);
	private static final Logger mail_log = LoggerFactory.getLogger("mail_log");
	

	public static void main(String[] args) {
		logger.info("starting.....");
		long start=System.currentTimeMillis();
		String inputDir = args[0];
		File dir = new File(inputDir);
		if (!dir.exists()) {
			logger.warn("目录不存在");
			System.exit(-1);
		}
		Collection<File> sourceFile = FileUtils.listFiles(dir, new String[] { "json" }, false);
		logger.info("sourceFiles size:" + sourceFile.size());
		logger.info("开始mapSplit");
		for (File f : sourceFile) {
			logger.info("mapSplit:"+f.getAbsolutePath());
			MapSplit map = new MapSplit();
			map.setInputFile(f);
			map.setOutputFile(new File(f.getAbsolutePath() + ".map"));
			map.setBatchSize(5000);
			map.init();
			map.map();
		}
		logger.info("完成mapsplit");
		
		Collection<File> mapFile = FileUtils.listFiles(dir, new String[] { "map" }, false);
		for (File f : mapFile) {
			logger.info("shuffler:"+f.getAbsolutePath());
			Shuffler shuffler=new Shuffler();
			shuffler.toShuffler(f);
		}
		logger.info("完成shuffler");
		logger.info("开始merge.......");
		String mergerFile="merger_refer";
		MergeFile merge=new MergeFile(new File(dir,mergerFile));
		
		Collection<File> shufflerFile = FileUtils.listFiles(dir, new String[] { "shuffler" }, false);
		try {
			merge.merge(shufflerFile);
		} catch (IOException e) {
			logger.error("",e);
		}
		logger.info("merge结束.......");
		long end = System.currentTimeMillis();
		long total=(end-start)/1000;
		logger.info("总共耗时:"+total);
		logger.info("end!");
		
		
	}

}

  • mapSplit的实现:


package com.wole.indexing.refer;

import gnu.trove.map.hash.TIntIntHashMap;
import gnu.trove.map.hash.TIntObjectHashMap;
import gnu.trove.procedure.TIntIntProcedure;
import gnu.trove.procedure.TIntObjectProcedure;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.chenlb.solr.indexing.DataSource;
import com.wole.indexing.util.Occupy;

/**
 * @author yzygenuine
 *
 */
public class MapSplit{
	private static final Logger logger = LoggerFactory.getLogger(MapSplit.class);
	private static final Logger mail_log = LoggerFactory.getLogger("mail_log");
	private File inputFile;
	private File outputFile;

	private LineIterator lineIter;
	private Map<String, Integer> domainMapNum = new HashMap<String, Integer>();

	private int batchSize = 5000;

	public boolean hasNext() {
		return lineIter.hasNext();
	}

	public void init() {
		logger.info("初始化");
		try {
			LineIterator lineIter = FileUtils.lineIterator(inputFile);
			if (lineIter != null) {
				this.lineIter = lineIter;
			}
		} catch (Exception e) {
			logger.error("", e);
			mail_log.info("from "+MapSplit.class.toString());
			mail_log.error("文件是否配置有问题,读取失败");
			throw new RuntimeException("文件是否配置有问题,读取失败");
			
		}

		domainMapNum.put("renren.com", 1);
		domainMapNum.put("qq.com", 2);

	}

	private TIntObjectHashMap<TIntIntHashMap> vidMap = new TIntObjectHashMap<TIntIntHashMap>();

	/**
	 * 统计总共加载的数据记录数
	 */
	private int count = 0;

	private int bigDomainCount = 0;

	public void nextData() {
		while (lineIter.hasNext()) {
			String line = lineIter.nextLine();
			try {
				JSONObject json = new JSONObject(line);
			
				Integer obj_count = json.getInt("count");
				String obj_domain = json.getString("domain");

				// 处理domain,只取后缀,找出对应的num
				int obj_vid = json.getInt("vid");
				TIntIntHashMap domain_count_map = vidMap.get(obj_vid);
				if (domain_count_map == null) {
					domain_count_map = new TIntIntHashMap();
					vidMap.put(obj_vid, domain_count_map);
				}
				// 查找并统计
				int domainNum = 0;
				Iterator<String> domainIter = domainMapNum.keySet().iterator();
				while (domainIter.hasNext()) {
					String key = domainIter.next();
					if (obj_domain.indexOf(key) != -1) {
						domainNum = domainMapNum.get(key);
						break;
					}
				}
				int domainCount = domain_count_map.get(domainNum);
				domainCount += obj_count;
				domain_count_map.put(domainNum, domainCount);
			} catch (JSONException e) {
				logger.error("", e);
			}
			count++;
			if (count % batchSize == 0) {
				break;
			}
		}
		logger.info("count:" + count);
	}

	final Set<String> domainSet = new HashSet<String>();

	public void map() {
		while (hasNext()) {
			nextData();
		}
		// 写入文件
		try {
			outputFile();
		} catch (IOException e) {
			logger.error("", e);
		}
	}

	final static String separator = System.getProperty("line.separator");
	private void outputFile() throws IOException {
		
		final String split = ",";
		final BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(outputFile));
		vidMap.forEachEntry(new TIntObjectProcedure<TIntIntHashMap>() {
			@Override
			public boolean execute(int vid, TIntIntHashMap domain_count_map) {
				final StringBuilder sb = new StringBuilder();
				sb.append(vid);
				sb.append(split);
				domain_count_map.forEachEntry(new TIntIntProcedure() {
					@Override
					public boolean execute(int domainNum, int domain_count) {
						sb.append(domainNum);
						sb.append(":");
						sb.append(domain_count);
						sb.append(split);
						return true;
					}
				});
				sb.append(separator);
				String line = sb.toString();
				try {
					out.write(line.getBytes("utf-8"), 0, line.length());
				} catch (Exception e) {
					logger.error("", e);
				}
				return true;
			}
		});

		try {
			out.close();
		} catch (Exception e) {
			logger.error("", e);
		}
		logger.info("写文件完成");

	}

	public void setInputFile(File inputFile) {
		this.inputFile = inputFile;
	}

	public void setOutputFile(File outputFile) {
		this.outputFile = outputFile;
	}

	public void setBatchSize(int batchSize) {
		this.batchSize = batchSize;
	}

	public static void main(String[] args) {
		String inputFile = args[0];
		String outputFile = args[1] + ".map";

		MapSplit map = new MapSplit();
		map.setInputFile(new File(inputFile));
		map.setOutputFile(new File(outputFile));
		map.setBatchSize(5000);
		map.init();
		map.map();
	}
}



  • shuffler实现代码如下:

package com.wole.indexing.refer;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.dsa.Sort;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.wole.indexing.util.Occupy;

/**
 * @author yzygenuine
 *中间结果排序
 */
public class Shuffler {
	private static final Logger logger = LoggerFactory.getLogger(MapSplit.class);
	private static final Logger mail_log = LoggerFactory.getLogger("mail_log");

	public void toShuffler(File inputFile) {
		LineIterator iter = null;
		try {
			iter = FileUtils.lineIterator(inputFile);
		} catch (IOException e) {
			logger.error("", e);
		}
		if (iter == null) {
			return;
		}
		//读取文件到list集合
		List<VidInfo> list = new ArrayList<VidInfo>();
		while (iter.hasNext()) {
			String line = (String) iter.next();
			VidInfo vidInfo=VidInfo.lineToVidInfo(line);
			list.add(vidInfo);
		}

		//将数组做排序
		VidInfo[] array = new VidInfo[list.size()];
		list.toArray(array);
		Sor      
    
[2]数据挖掘之聚类
    来源: 互联网  发布时间: 2013-10-25

聚类属于无监督学习。

聚类的算法有很多种,其可分为基于划分、层次、密度、网格及模型的聚类方法。

根据数据集的不同,需要采用不同的聚类算法和策略。


1. 选择聚类算法,所面临的常见问题又哪些?

1) 不同形状的数据集。不同形状的数据集,也需要采取不同的度量策略,或者不同的聚类算法。

2)不同的数据次序。相同数据集,但数据输入次序不同,也会造成聚类的结果的不同。

3)噪声。不同的算法,对噪声的敏感程度不同。


2. 在高维的欧式空间,什么是“维数灾难”?

在高维下,所有点对的距离都差不多(如欧式距离),或者是几乎任意两个向量都是正交(利用夹角进行进行度量),这样聚类就很困难。


3. 常见的聚类算法的策略有哪些?

1)层次或凝聚式聚类。采取合并的方式,将邻近点或簇合并成一个大簇。

2)点分配。每次遍历数据集,将数据分配到一个暂时适合的簇中,然后不断更新。


4. 层次聚类算法的复杂度是多少?

每次合并,都需计算出两个点对之间的距离,复杂度是O(n^2), 后续步骤的开销,分布正比与O((n-1)^2), O((n-2)^2)...,这样求和算下来,算法复杂度是O(n^3).

算法优化:采用优先队列/最小堆来优化计算。优先队列的构建,第一步需要计算出每两个点的距离,这个开销是O(N^2). 一般情况下,N个元素,单纯的优先队列的构建开销为O(N),若是N^2个距离值,则建堆的开销是O(N^2)。

第二步,合并,合并需要一个删除、计算和重新插入的过程。因为合并一个簇对,就需要更新N个元素,开销为O(N*logN)。总的开销为O((N^2) * logN).

所以,总的算法复杂度为O((N^2) * logN).


5. 欧式空间与非欧式空间下,常见的簇之间的距离度量有哪些?

欧式空间:

1)两个簇之间的质心之间的距离最小

2)两个簇中所有点之间的最短距离

3)两个簇之间所有点对的平均距离

4)将具有最小半径的两个簇进行合并, 簇的半径:簇内的点到质心的最大距离

5)将具有最小直径的两个簇进行合并,簇的直径:簇内任意两点间的最大距离


非欧式空间,簇的中心点定义,该点距离其他点的距离最近,如何计算?

1)该点到簇中其他所有点的距离之和(求和),1-范数

2)该点到簇中其他点的最大距离(最大值),无穷-范数

3)该点到簇中其他点的平方和(平方和),2-范数


6. k-means算法,k均值算法

点分配式的聚类算法。一般用于球形或凸集的数据集。

算法步骤如下:

1)初始化k个选择点作为最初的k个簇的中心

2)计算每个点分别到k个簇的中心,并将点分配到其距离最近的簇中

3)由分配的点集,分别更新每个簇的中心,然后回到2,继续算法,直到簇的中心变化小于某个阈值


7. k-means算法的两个问题?

1)初始化选择点;常用的方式是尽量选择距离比较远的点(方法:依次计算出与已确定的点的距离,并选择距离最大的点),或者首先采取层次聚类的方式找出k个簇

2)如何选取k值;k值选取不当,会导致的问题?当k的数目低于真实的簇的数目时,平均直径或其他分散度指标会快速上升

可以采用多次聚类,然后比较的方式。多次聚类,一般是采用1, 2, 4, 8...数列的方式,然后找到一个指标在v/2, v时,获取较好的效果,然后再使用二分法,在[v/2, v]之间找到最佳的k值。


8. CURE算法

使用场景:

任何形状的簇,如S形、环形等等,不需要满足正态分布,欧式空间,可以用于内存不足的情况

特征:

簇的表示不是采用质心,而是用一些代表点的集合来表示。


算法步骤:

1)初始化。抽取样本数据在内存中进行聚类,方法可以采用层次聚类的方式,形成簇之后,从每个簇中再选取一部分点作为簇的代表点,并且每个簇的代表点之间的距离尽量远。对每个代表点向质心移动一段距离,距离的计算方法:点的位置到簇中心的距离乘以一个固定的比例,如20%。

2)对簇进行合并。当两个簇的代表点之间足够近,那么就合并这两个簇,直到没有更足够接近的簇。

3)点分配。对所有点进行分配,即将点分配给与代表点最近的簇。


9. GRGPF算法

场景:

非欧式空间,可用于内存不足的情况(对数据抽样)

特征:

同时使用了层次聚类和点分配的的思想。

如何表示簇?

数据特征:簇包含点的数目,簇中心点,离中心点最近的一些点集和最远的一些点集,ROWSUM(p)即点p到簇中其他店的距离平方和。靠近中心的点集便于修改中心点的位置,而远离中心的点便于对簇进行合并。

簇的组织:类似B-树结构。首先,抽取样本点,然后做层次聚类,就形成了树T的结构。然后,从树T中选取一系列簇,即是GRGPF算法的初始簇。然后将T中具有相同祖先的簇聚合,表示书中的内部节点。

点的分配:对簇进行初始化之后,将每个点插入到距离最近的那个簇。

具体处理的细节更为复杂,如果对B-树比较了解,应该有帮助。


10. 流聚类,如何对最近m个点进行聚类?

N个点组成的滑动窗口模型,类似DGIM算法中统计1的个数。

1)首先,划分桶,桶的大小是2的次幂,每一级桶的个数最多是b个。

2)其次,对每个桶内的数据进行聚类,如采用层次聚类的方法。

3)当有新数据来临,需要新建桶,或者合并桶,这个类似于GDIM,但除了合并,还需要合并簇,当流内聚类的模型变化不是很快的时候,可以采取直接质心合并的方式。

4)查询应答:对最近的m个点进行聚类,当m不在桶的分界线上时,可以采用近似的方式求解,只需求出 包含m个点的最少桶 的结果


作者:viewcode 发表于2013-6-24 18:00:11 原文链接
阅读:0 评论:0 查看评论

    
[3]编程珠玑---读书笔记---第11章排序
    来源: 互联网  发布时间: 2013-10-25

插入排序:

对于小型的排序任务速度很快,它是稳定的,只需要O(1)的额外空间,基于比较和交换的次数为O(n^2)。

#include <iostream>
#include <string>
#include <cstdio>
#include <cmath>
#include <vector>
#include <algorithm>
#include <sstream>
#include <cstdlib>
#include <fstream>
#include <queue>
using namespace std;
int x[8]={55,41,59,26,53,58,97,93};
int main()
{
	//ifstream fin;
	//fin.open("data1.txt");
	for(int i=1;i<8;i++)
		for(int j=i;j>0 && x[j]<x[j-1];j-- )
		{
			int t=x[j];
			x[j]=x[j-1];
			x[j-1]=t;
		}
	for(int i=0;i<8;i++)cout<<x[i]<<" ";
	cout<<endl;
	
	return 0;

}

 

快速排序:

如果n很大,快速排序的O(n*logn)的运行时间就很关键了,在结合随机划分和双向划分后,对于任意的n元输入数组,快排的期望运行时间正比于 n logn。

下面代码的版本是基于第一个元素进行划分,对于随机输入的数据这是没有问题的,但是对于某些常见输入,比如数组已基本有序,那么最坏情况下需要

O(n^2)的时间,这时候,我们可以才用随机划分元素的方法,可以改善性能,通过把第一个元素和后面所有元素中的一个随机项交换来实现这一点:

#include <iostream>
#include <string>
#include <cstdio>
#include <cmath>
#include <vector>
#include <algorithm>
#include <sstream>
#include <cstdlib>
#include <fstream>
#include <queue>
using namespace std;
int x[8]={55,41,59,26,53,58,97,93};
void qsort1(int l,int u)
{
	if(l>=u)return;
	int m=l;
	for(int i=l+1;i<=u;i++)
	{
		if(x[i]<x[l]){
			m++;
			int t=x[m];
			x[m]=x[i];
			x[i]=t;
		}
	}
	int t=x[l];
	x[l]=x[m];
	x[m]=t;
	qsort1(l,m-1);
	qsort1(m+1,u);
	
}
int main()
{
	qsort1(0,7);
	
	for(int i=0;i<8;i++)cout<<x[i]<<" ";
	cout<<endl;
	
	return 0;

}

 

顺便说一下:C库函数qsort非常简单相对较快,但是它比我们自己写的快排慢,仅仅是因为其通用而灵活的接口对每次比较都使用函数调用,C++库函数sort具有最简单的

接口:我们通过调用sort(x,x+n)来对数组x排序,实现也很高效。

作者:xiaozhuaixifu 发表于2013-6-24 17:02:25 原文链接
阅读:75 评论:0 查看评论

    
最新技术文章:
▪用户及权限基础 2---- Linux权限    ▪用户及权限基础 3---- Linux扩展权限    ▪git 简明教程(1) --创建及提交
▪背包 代码    ▪json对象的封装与解析    ▪01背包,完全背包,多重背包 ,模板代码
▪apache安装详解    ▪HDU 4668 Finding string (解析字符串 + KMP)    ▪《TCP-IP详解 卷1:协议》学习笔记(二)
▪《TCP-IP详解 卷1:协议》学习笔记(持续更新...    ▪windows下使用swig    ▪gensim试用
▪Linux Shell脚本编程--nc命令使用详解    ▪solr对跨服务器表联合查询的配置    ▪递归和非递归实现链表反转
▪Linux磁盘及文件系统管理 1---- 磁盘基本概念    ▪Cholesky Decomposition    ▪HTTP协议学习
▪用C语言写CGI入门教程    ▪用hdfs存储海量的视频数据的设计思路    ▪java多线程下载的实现示例
▪【原创】eAccelerator 一个锁bug问题跟踪    ▪hadoop学习之ZooKeeper    ▪使用cuzysdk web API 实现购物导航类网站
▪二维数组中的最长递减子序列    ▪内嵌W5100的网络模块WIZ812MJ--数据手册    ▪xss 跨站脚本攻击
▪RobotFramework+Selenium2环境搭建与入门实例    ▪什么是API    ▪用PersonalRank实现基于图的推荐算法
▪Logtype    ▪关于端口号你知道多少!    ▪Linux基本操作 1-----命令行BASH的基本操作 iis7站长之家
▪CI8.7--硬币组合问题    ▪Ruby on Rails 学习(五)    ▪如何使用W5300实现ADSL连接(二)
▪不允许启动新事务,因为有其他线程正在该会...    ▪getting start with storm 翻译 第六章 part-3    ▪递归求排列和组合(无重复和有重复)
▪工具类之二:RegexpUtils    ▪Coding Interview 8.2    ▪Coding Interview 8.5
▪素因子分解 Prime factorization    ▪C# DllImport的用法    ▪图的相关算法
▪Softmax算法:逻辑回归的扩展    ▪最小生成树---Kruskal算法---挑战程序设计竞赛...    ▪J2EE struts2 登录验证
▪任意两点间的最短路径---floyd_warshall算法    ▪Sqoop实现关系型数据库到hive的数据传输    ▪FFMPEG采集摄像头数据并切片为iPhone的HTTP Stream...
▪Ubuntu 13.04 – Install Jetty 9    ▪TCP/IP笔记之多播与广播    ▪keytool+tomcat配置HTTPS双向证书认证
▪安装phantomjs    ▪Page Redirect Speed Test    ▪windows media player 中播放pls的方法
▪sre_constants.error: unbalanced parenthesis    ▪http headers    ▪Google MapReduce中文版
▪The TCP three-way handshake (connect)/four wave (closed)    ▪网站反爬虫    ▪Log4j实现对Java日志的配置全攻略
▪Bit Map解析    ▪Notepad 快捷键 大全    ▪Eclipse 快捷键技巧 + 重构
▪win7 打开防火墙端口    ▪Linux Shell脚本入门--awk命令详解    ▪Linux Shell脚本入门--Uniq命令
▪Linux(Android NDK)如何避免僵死进程    ▪http Content-Type一览表    ▪Redis实战之征服 Redis + Jedis + Spring (二)
▪Tomcat7.0.40 基于DataSourceRealm的和JDBCRealm的资源...    ▪利用SQOOP将ORACLE到HDFS    ▪django输出 hello world
▪python re    ▪unity3D与网页的交互    ▪内存共享基本演示
▪python join    ▪不再为无限级树结构烦恼,且看此篇    ▪python实现变参
▪打开文件数限制功能不断地制造问题    ▪Arduino Due, Maple and Teensy3.0 的 W5200性能测试    ▪Selenium实例----12306网站测试
▪基于协同过滤的推荐引擎    ▪C4.5决策树    ▪C#HTTP代理的实现之注册表实现
▪nosql和关系型数据库比较?    ▪如何快速比较这两个字符串是否相等?    ▪hdoj 1863 畅通工程 最小生成树---prime算法
 


站内导航:


特别声明:169IT网站部分信息来自互联网,如果侵犯您的权利,请及时告知,本站将立即删除!

©2012-2021,,E-mail:www_#163.com(请将#改为@)

浙ICP备11055608号-3