MongoDB的分片主要是指将集合拆分成小块并分别存在不同服务器上的过程。MongoDB支持自动分片,可摆脱手动分片管理上的困难。
在以下情况下需要运用分片:
1.服务器的磁盘不够用。
2.单个Mongod不能满足写数据的性能需求。
3.需要将大数据放入内存中提高性能。
下图为我们要实现的分片结构:
从图中我们看到原本一台Mongod节点被分成了A和B两个分片,由路由D读取配置服务器C的分片策略,然后决定数据存储在哪个分片上,而路由隐藏了决策的细节,用户直接访问路由就可以享受分片带来的优点,而不必关心路由读取分片的细节。
配置步骤:
1.创建配置服务器C
创建配置文件如下,端口为10000,并启动mongod -f config.cnf
dbpath=D:\mongodb\test\sharded\C\Data
bind_ip=127.0.0.1
port=10000
2.创建路由服务器D
路由器的配置文件如下:
bind_ip=127.0.0.1
port=20000
configdb=127.0.0.1:10000
其中configdb=127.0.0.1:10000配置的是路由监听的配置服务器的地址
路由器用mongos启动
mongos -f config.cnf
注意:配置服务器要先启动,因为路由服务器需要监听配置服务器。
3.新建分片服务器A和B
A的配置,然后启动A
dbpath=D:\mongodb\test\sharded\A\Data
bind_ip=127.0.0.1
port=8001
B的配置,然后启动B
dbpath=D:\mongodb\test\sharded\B\Data
bind_ip=127.0.0.1
port=8002
4.建立集群中分片服务器与路由器的连接
这个操作需要在路由器中配置,打开路由器的shell,执行数据库命令
db.runCommand({addshard:"127.0.0.1:8001",allowLocal:true})
db.runCommand({addshard:"127.0.0.1:8002",allowLocal:true})
可以看到执行添加分片的操作要在admin库中进行
5.为业务数据库添加分片功能
为person添加分片功能,在路由中执行db.runCommand({"enablesharding":"person"})
6.为集合进行分片
片键:集合中的一个键作为分拆的依据。
为person库的集合info进行分片,key字段设置了片键
执行下列命令db.runCommand({"shardcollection":"person.info","key":{"_id":1}})
7.添加一定的大数据量,测试分片的功能
用脚本插入80万条数据
for(i=1;i<=800001;i++){
db.info.insert({name:i})
}
在分别在分片服务器A和B上查询数据量,如图:看到这800001条数据已经分别存放在2个分片中了
正式环境配置
成功地构建分片需要如下条件:
1.多个配置服务器
2.多个mongos服务器
3.每个片都是副本集
1.多个配置服务器
创建配置服务器如上,现在启动mongos的时候应将其连接到这3个配置服务器。假如3个配置文件的端口号是20001~20003
mongos --configdb localhost:20001,localhost:20002,localhost:20003
配置服务器使用的是两步提交机制,不是普通的MongoDB的异步复制,来维护集群配置的不同副本。这样能保证集群状态的一致性。这意味着某台配置服务器down了后,集群配置信息将是只读的。但是客户端还是能够读写的,只有所有配置服务器备份了以后才能重新均衡数据。
2.多个mongos
Mongos的数量不受限制,建议针对一个应用服务器只运行一个mongos进程。这样每个应用服务器就可以与mongos进行本地会话。
3.每个片都是副本集
生产环境中,每个片都应是副本集。这样单个服务器坏了,就不会导致整个片失效。用addshard命令就可以讲副本集作为片添加,添加时只要指定副本集的名字和种子就好了。
实现即达到数据的分片存储也实现备份和故障自动修复功能,可以副本集和分片混合使用,构建如下图的架构,(为了举例简单仅给shardA做了副本集配置)
1.修改A的配置如下:
dbpath=D:\mongodb\test\sharded\A\Data
bind_ip=127.0.0.1
port=8001
replSet=replicademo/127.0.0.1:8003
新添加A1和A2两台节点与A组成副本集
A1的配置:
dbpath=D:\mongodb\test\sharded\A1\Data
bind_ip=127.0.0.1
port=8004
replSet=replicademo/127.0.0.1:8003
A2的配置:
dbpath=D:\mongodb\test\sharded\A2\Data
bind_ip=127.0.0.1
port=8003
replSet=replicademo/127.0.0.1:8001
在A的shell中执行副本集的舒适化
db.runCommand({"replSetInitiate":
{
"_id":'replicademo',
"members":[
{
"_id":1,
"host":"127.0.0.1:8001"
},
{
"_id":2,
"host":"127.0.0.1:8003"
},
{
"_id":3,
"host":"127.0.0.1:8004"
}
]
}
})
这样A,A1,A2的副本集就建立完成,查询配置看到A为活跃节点,如下图:
回到路由器上设置分片配置
mongos> db.runCommand({addshard:"replicademo/127.0.0.1:8001"})
这样mongos会知道它所连接的是replicademo副本集,在活跃节点down掉之后就会去寻找新的活跃结点。
执行db.printShardingStatus(),会看到副本集的节点都已经自动的配置进来了,如下图:
管理分片
分片的信息主要存放在config数据库上,这样就能被任何连接到mongos的进程访问到了。
配置集合
下面的代码
Mongodb提供了简单易用的导出和导入命令。
一、导出命令的简介与使用。
MongoDB提供了一个简单的导出工具,这个工具位于{MongoDB_HOME}/bin/mongoexport.exe,如截图(我的mongodb部署在windows操作系统中,在linux中类似):
下面介绍一下如何使用mongoexport导出命令:mongoexport -c gis -d local -o likehua.data
其中-c 表示几何
-d 表示数据库
-o 输出文件名。
更多参数详见:
Administrator@F523540 d:/Mongodb/bin
$ mongoexport --help
Export MongoDB data to CSV, TSV or JSON files.
options:
--help produce help message
-v [ --verbose ] be more verbose (include multiple times
for more verbosity e.g. -vvvvv)
--version print the program's version and exit
-h [ --host ] arg mongo host to connect to ( <set
name>/s1,s2 for sets)
--port arg server port. Can also use --host
hostname:port
--ipv6 enable IPv6 support (disabled by
default)
-u [ --username ] arg username
-p [ --password ] arg password
--authenticationDatabase arg user source (defaults to dbname)
--authenticationMechanism arg (=MONGODB-CR)
authentication mechanism
--dbpath arg directly access mongod database files
in the given path, instead of
connecting to a mongod server - needs
to lock the data directory, so cannot
be used if a mongod is currently
accessing the same path
--directoryperdb each db is in a separate directly
(relevant only if dbpath specified)
--journal enable journaling (relevant only if
dbpath specified)
-d [ --db ] arg database to use
-c [ --collection ] arg collection to use (some commands)
-f [ --fields ] arg comma separated list of field names
e.g. -f name,age
--fieldFile arg file with fields names - 1 per line
-q [ --query ] arg query filter, as a JSON string
--csv export to csv instead of json
-o [ --out ] arg output file; if not specified, stdout
is used
--jsonArray output to a json array rather than one
object per line
-k [ --slaveOk ] arg (=1) use secondaries for export if
available, default true
--forceTableScan force a table scan (do not use
$snapshot)
二、导入命令的简介与使用。
导入命令的位置,如图:
使用:
dministrator@F523540 d:/Mongodb/bin
mongoimport -d
需要引入Hadoop和Hbase的jar包,我这里HBase用的是hbase-0.90.5版本,所以我这里引入的HBase的jar包是hbase-0.90.5.jar和zookeeper-3.3.2.jar。
一些常用的API操作:
package cn.luxh.app.util;
import java.io.IOException;
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FilterList.Operator;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
import org.apache.hadoop.hbase.util.Bytes;
public class HBaseUtil {
/**
* 初始化HBase的配置文件
* @return
*/
public static Configuration getConfiguration(){
Configuration conf = HBaseConfiguration.create();
//和hbase-site.xml中配置的一致
conf.set("hbase.zooker.quorum", "h1,h2,h2");
return conf;
}
/**
* 实例化HBaseAdmin,HBaseAdmin用于对表的元素据进行操作
* @return
* @throws MasterNotRunningException
* @throws ZooKeeperConnectionException
*/
public static HBaseAdmin getHBaseAdmin() throws MasterNotRunningException, ZooKeeperConnectionException{
return new HBaseAdmin(getConfiguration());
}
/**
* 创建表
* @param tableName 表名
* @param columnFamilies 列族
* @throws IOException
*/
public static void createTable(String tableName,String...columnFamilies) throws IOException {
HTableDescriptor htd = new HTableDescriptor(tableName.getBytes());//
for(String fc : columnFamilies) {
htd.addFamily(new HColumnDescriptor(fc));
}
getHBaseAdmin().createTable(htd);
}
/**
* 获取HTableDescriptor
* @param tableName
* @return
* @throws IOException
*/
public static HTableDescriptor getHTableDescriptor(byte[] tableName) throws IOException{
return getHBaseAdmin().getTableDescriptor(tableName);
}
/**
* 获取表
* @param tableName 表名
* @return
* @throws IOException
*/
public static HTable getHTable(String tableName) throws IOException{
return new HTable(getConfiguration(),tableName);
}
/**
* 获取Put,Put是插入一行数据的封装格式
* @param tableName
* @param row
* @param columnFamily
* @param qualifier
* @param value
* @return
* @throws IOException
*/
public static Put getPut(String row,String columnFamily,String qualifier,String value) throws IOException{
Put put = new Put(row.getBytes());
if(qualifier==null||"".equals(qualifier)) {
put.add(columnFamily.getBytes(), null, value.getBytes());
}else {
put.add(columnFamily.getBytes(), qualifier.getBytes(), value.getBytes());
}