当前位置: 编程技术>java/j2ee
深入Sqlite多线程入库的问题
来源: 互联网 发布时间:2014-10-25
本文导语: 今天经理给了我一个三十多M的sql文件,让我测试数据定位的问题。按照惯例,我使用navicat for sqlite创建一个表,然后将sql文件导入。我然后去干其他事儿了,大约过了一个多小时,我想数据应该导入的差不多了吧。我打开一...
今天经理给了我一个三十多M的sql文件,让我测试数据定位的问题。按照惯例,我使用navicat for sqlite创建一个表,然后将sql文件导入。我然后去干其他事儿了,大约过了一个多小时,我想数据应该导入的差不多了吧。我打开一看,汗,死在那儿了。我关掉软件又重新导入一遍,还是那个德行。又得知经理曾经自己也导过,没有成功。看来,用工具导入的方法行不通了。
但是,想想就十多万条数据,就是十多万条insert sql语句,有那么难吗?于是,我想还是自己写一个程序导入吧。虽然中间也遇到一些小插曲,但是还是成功地把数据导进去了。
程序的代码如下:
package com.geoway.pad.common.tool;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author likehua
* @note SQLite建库以及批量入库
* */
public class BatchTool{
//ddl
private static String ddl="CREATE TABLE IF NOT EXISTS pbeijing_point (OBJECTID INTEGER,NAME TEXT,ADDRESS TEXT,PHONE TEXT,FAX TEXT,TYPE TEXT,CITYCODE TEXT,URL TEXT,EMAIL TEXT,NAME2 TEXT,X INTEGER,Y INTEGER)";
Connection jCon=null;
//get connection
public synchronized Connection getConnection(){
if(jCon==null){
// json=
Statement state=null;
try {
Class.forName("org.sqlite.JDBC");
jCon=DriverManager.getConnection("jdbc:sqlite:c:\newD.db");
state=jCon.createStatement();
state.executeUpdate(ddl);
} catch (SQLException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
return jCon;
}
//创建500个线程
ExecutorService service=Executors.newFixedThreadPool(500);
//读取sql文件 每五百个insert 语句由一个线程批量操作
public void readBatchSQL(InputStream is) throws IOException{
BufferedReader bufferReader=new BufferedReader(new InputStreamReader(is,"UTF-8"));
String line;
String one="";
int tag=0;
String batchSql="";
while((line=bufferReader.readLine())!=null){
one+=line;
if(one.indexOf(";")!=-1){
batchSql+=one;
one="";//reset
tag++;
};
//符合条件 开辟一个线程
if(tag!=0&&tag/500!=0){
service.execute(new SQLiteBatchHandler(batchSql));
batchSql="";//reset
tag=0;//reset
}
}
//最后执行 剩余的sql
if(batchSql.length()>0){
System.out.println("finalSQL:"+batchSql);
Runnable r=new SQLiteBatchHandler(batchSql);
service.execute(r);
};
try {
//关闭线程池
this.service.shutdown();
this.service.awaitTermination(1, TimeUnit.HOURS);
getConnection().close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
};
/**
* @note 分割sql
* */
private static String[] splitSQL(String batchSQl){
if(batchSQl!=null){
return batchSQl.split(";");
};
return null;
}
/**
* @note 执行批量更新操作
* 由于connection.comit 操作时 如果存在 statement没有close 就会报错 因此将此方法加上同步 。
* */
private synchronized void exucteUpdate(String batch){
Statement ste=null;
Connection con=null;
try{
con=getConnection();
con.setAutoCommit(false);
ste=con.createStatement();
String[] sqls=this.splitSQL(batch);
for(String sql:sqls){
if(sql!=null){
ste.addBatch(sql);
};
};
ste.executeBatch();
ste.close();
con.commit();//提交
}catch(Exception e){
e.printStackTrace();
System.out.println("执行失败:"+batch);
try {
con.rollback();//回滚
} catch (SQLException e1) {
e1.printStackTrace();
}
}finally{
if(ste!=null){
try {
ste.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
/**
* @author likehua
* @note 入库线程
* */
private class SQLiteBatchHandler implements Runnable{
private String batch;
public SQLiteBatchHandler(String sql){
this.batch=sql;
};
@SuppressWarnings("static-access")
@Override
public void run() {
try {
Thread.currentThread().sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(this.batch.length()>0){
exucteUpdate(batch);
};
}
}
/**
* @author likehua
* @note 主函数入口
* */
public static void main(String[] args) throws FileNotFoundException, IOException{
BatchTool s=new BatchTool();
s.readBatchSQL(new FileInputStream(new File("c:\poi.sql")));
}
}
但是,想想就十多万条数据,就是十多万条insert sql语句,有那么难吗?于是,我想还是自己写一个程序导入吧。虽然中间也遇到一些小插曲,但是还是成功地把数据导进去了。
程序的代码如下:
代码如下:
package com.geoway.pad.common.tool;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
/**
* @author likehua
* @note SQLite建库以及批量入库
* */
public class BatchTool{
//ddl
private static String ddl="CREATE TABLE IF NOT EXISTS pbeijing_point (OBJECTID INTEGER,NAME TEXT,ADDRESS TEXT,PHONE TEXT,FAX TEXT,TYPE TEXT,CITYCODE TEXT,URL TEXT,EMAIL TEXT,NAME2 TEXT,X INTEGER,Y INTEGER)";
Connection jCon=null;
//get connection
public synchronized Connection getConnection(){
if(jCon==null){
// json=
Statement state=null;
try {
Class.forName("org.sqlite.JDBC");
jCon=DriverManager.getConnection("jdbc:sqlite:c:\newD.db");
state=jCon.createStatement();
state.executeUpdate(ddl);
} catch (SQLException e) {
e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
return jCon;
}
//创建500个线程
ExecutorService service=Executors.newFixedThreadPool(500);
//读取sql文件 每五百个insert 语句由一个线程批量操作
public void readBatchSQL(InputStream is) throws IOException{
BufferedReader bufferReader=new BufferedReader(new InputStreamReader(is,"UTF-8"));
String line;
String one="";
int tag=0;
String batchSql="";
while((line=bufferReader.readLine())!=null){
one+=line;
if(one.indexOf(";")!=-1){
batchSql+=one;
one="";//reset
tag++;
};
//符合条件 开辟一个线程
if(tag!=0&&tag/500!=0){
service.execute(new SQLiteBatchHandler(batchSql));
batchSql="";//reset
tag=0;//reset
}
}
//最后执行 剩余的sql
if(batchSql.length()>0){
System.out.println("finalSQL:"+batchSql);
Runnable r=new SQLiteBatchHandler(batchSql);
service.execute(r);
};
try {
//关闭线程池
this.service.shutdown();
this.service.awaitTermination(1, TimeUnit.HOURS);
getConnection().close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (SQLException e) {
e.printStackTrace();
}
};
/**
* @note 分割sql
* */
private static String[] splitSQL(String batchSQl){
if(batchSQl!=null){
return batchSQl.split(";");
};
return null;
}
/**
* @note 执行批量更新操作
* 由于connection.comit 操作时 如果存在 statement没有close 就会报错 因此将此方法加上同步 。
* */
private synchronized void exucteUpdate(String batch){
Statement ste=null;
Connection con=null;
try{
con=getConnection();
con.setAutoCommit(false);
ste=con.createStatement();
String[] sqls=this.splitSQL(batch);
for(String sql:sqls){
if(sql!=null){
ste.addBatch(sql);
};
};
ste.executeBatch();
ste.close();
con.commit();//提交
}catch(Exception e){
e.printStackTrace();
System.out.println("执行失败:"+batch);
try {
con.rollback();//回滚
} catch (SQLException e1) {
e1.printStackTrace();
}
}finally{
if(ste!=null){
try {
ste.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
/**
* @author likehua
* @note 入库线程
* */
private class SQLiteBatchHandler implements Runnable{
private String batch;
public SQLiteBatchHandler(String sql){
this.batch=sql;
};
@SuppressWarnings("static-access")
@Override
public void run() {
try {
Thread.currentThread().sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
if(this.batch.length()>0){
exucteUpdate(batch);
};
}
}
/**
* @author likehua
* @note 主函数入口
* */
public static void main(String[] args) throws FileNotFoundException, IOException{
BatchTool s=new BatchTool();
s.readBatchSQL(new FileInputStream(new File("c:\poi.sql")));
}
}