第一步、开发环境:
win7 64位(注:MongoDb在32位windows上有数量限制(2G),详见官方文档)
Mongodb3.2
mongofb_java_driver 3.2.2
第二部、安装mongodb,并开启服务
略:可参见官方文档
第三部、代码
import java.io.BufferedReader;import java.io.File;import java.io.FileInputStream;import java.io.IOException;import java.io.InputStreamReader;import java.util.LinkedList;import java.util.List;import org.bson.Document;import com.mongodb.MongoClient;import com.mongodb.MongoWriteException;import com.mongodb.client.MongoCollection;import com.mongodb.client.MongoDatabase;/** * created by soarhu 2016/4/21 */public class MongodbBatchInsetUtils { static final int ThreadNum=3;//设置向MongoDb中插入数据的线程数 static int ThreadSizeCount = 0;//用于计算子线程完成数 static final String HOST = "127.0.0.1";//主机 static final int PORT = 27017;//端口 static final String DATABASE_NAME="mydb";//存储数据库名称,如果不存在会自动创建数据库 static final String COLLECTION_NAME="md";//存储Collection public static final String DIR = "E:\\targets";//扫描文件路径 public static final String FILE_SUFFIX = "html";//扫描文件类型,不设置,默认为所有文件 public static final String CHARSET = "UTF-8";//文件处理编码格式 public static void main(String[] args) { MongoClient client =new MongoClient(HOST,PORT); MongoDatabase dataBase = client.getDatabase(DATABASE_NAME); MongoCollectioncollection = dataBase.getCollection(COLLECTION_NAME); Pool p = new Pool(); Produce pro = new Produce(p); Long startTime = System.currentTimeMillis(); new Thread(pro).start();//开启从磁盘读取文件的线程 Thread[] th = new Thread[ThreadNum]; for(int i=0;i 0){ File file = new File(dir.trim()); if(file.exists() && file.isDirectory()){ File[] flist = file.listFiles(); if(null!=flist && flist.length>0){ for(File f:flist){ if(f.isFile()){ if(null==suffix|| "".equals(suffix)){ pool.putFile(f); } if(null!=suffix &&suffix.trim().length()>0){ if(f.getName().endsWith(suffix.trim())){ pool.putFile(f); }else{ throw new RuntimeException("找不到对应文件类型");} } }else{ getFilesInDir(f.getAbsolutePath(),suffix); } } }else{ throw new RuntimeException("文件内容为空");} }else{ throw new RuntimeException("目录不存在,请检查路径正确性!");} } }}//消费者,向mongoDb中写数据class Customer implements Runnable{ private Pool pool=null; MongoCollection collection = null; public Customer(Pool pool,MongoCollection collection){ this.pool = pool; this.collection = collection; } @Override public void run() { while(true){ File f = pool.fetchFile(); if(null==f){ return ; } try { saveToMonGoDb(f);// if(pool.hasUploadToDB%1000==0)// System.out.println("已写入数据:"+pool.hasUploadToDB); } catch (MongoWriteException e) { System.out.println("写入数据库异常:"+e.getMessage()); return ; } if(pool.getSize()==0){ System.out.println(Thread.currentThread().getName()+" :WRITTING FINISHED!!"); MongodbBatchInsetUtils.ThreadSizeCount++; } } } //将文件以文件名为id,文件内容为值保存在数据库中 private void saveToMonGoDb(File file){ String _id = file.getName().substring(0,file.getName().lastIndexOf(".")); String content = readFileContext(file, MongodbBatchInsetUtils.CHARSET); Document document = new Document("_id",_id).append("content", content); collection.insertOne(document); } //读取文件内容,以charSet编码处理 public static String readFileContext(File file,String charSet) { StringBuilder sb; BufferedReader reader=null; try { reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), charSet)); String line = null; sb = new StringBuilder(); while(null!=(line = reader.readLine())){ sb.append(line+"\n"); } return sb.toString(); }catch (Exception e) { System.out.println("文件读取失败!"+e.getMessage()); }finally{ try { reader.close(); } catch (IOException e) { e.printStackTrace(); } } return null; } }//池,缓冲区class Pool{ volatile int size=0;//缓冲区中条目数量 volatile int limit =1000; volatile int hasUploadToDB=0; volatile private List files = new LinkedList (); //入栈 public synchronized void putFile(File file){ while(files.size()==limit){ try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } files.add(file); notifyAll(); ++size; } //出栈 public synchronized File fetchFile(){ while(files.size()==0 ){ try { this.wait(); } catch (InterruptedException e) { return null; } } File file = null; notify(); if(files.size()>0){ file = files.remove(0); --size; ++hasUploadToDB; } return file; } public int getSize(){ return this.size; } }