博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
利用java多线程向MongoDB中批量插入静态文件
阅读量:4639 次
发布时间:2019-06-09

本文共 5508 字,大约阅读时间需要 18 分钟。

第一步、开发环境:

    win7 64位(注:MongoDb在32位windows上有数量限制(2G),详见官方文档)

    Mongodb3.2

    mongofb_java_driver 3.2.2

第二部、安装mongodb,并开启服务

    略:可参见官方文档

第三部、代码

import java.io.BufferedReader;import java.io.File;import java.io.FileInputStream;import java.io.IOException;import java.io.InputStreamReader;import java.util.LinkedList;import java.util.List;import org.bson.Document;import com.mongodb.MongoClient;import com.mongodb.MongoWriteException;import com.mongodb.client.MongoCollection;import com.mongodb.client.MongoDatabase;/** *  created by soarhu 2016/4/21 */public class MongodbBatchInsetUtils {        static final int ThreadNum=3;//设置向MongoDb中插入数据的线程数    static int ThreadSizeCount = 0;//用于计算子线程完成数    static final String HOST = "127.0.0.1";//主机    static final int PORT = 27017;//端口        static final String DATABASE_NAME="mydb";//存储数据库名称,如果不存在会自动创建数据库    static final String COLLECTION_NAME="md";//存储Collection    public static final String DIR = "E:\\targets";//扫描文件路径    public static final String FILE_SUFFIX = "html";//扫描文件类型,不设置,默认为所有文件    public static final String CHARSET = "UTF-8";//文件处理编码格式        public static void main(String[] args) {                MongoClient client =new MongoClient(HOST,PORT);        MongoDatabase dataBase = client.getDatabase(DATABASE_NAME);        MongoCollection
collection = dataBase.getCollection(COLLECTION_NAME); Pool p = new Pool(); Produce pro = new Produce(p); Long startTime = System.currentTimeMillis(); new Thread(pro).start();//开启从磁盘读取文件的线程 Thread[] th = new Thread[ThreadNum]; for(int i=0;i
0){ File file = new File(dir.trim()); if(file.exists() && file.isDirectory()){ File[] flist = file.listFiles(); if(null!=flist && flist.length>0){ for(File f:flist){ if(f.isFile()){ if(null==suffix|| "".equals(suffix)){ pool.putFile(f); } if(null!=suffix &&suffix.trim().length()>0){ if(f.getName().endsWith(suffix.trim())){ pool.putFile(f); }else{
throw new RuntimeException("找不到对应文件类型");} } }else{ getFilesInDir(f.getAbsolutePath(),suffix); } } }else{
throw new RuntimeException("文件内容为空");} }else{
throw new RuntimeException("目录不存在,请检查路径正确性!");} } }}//消费者,向mongoDb中写数据class Customer implements Runnable{ private Pool pool=null; MongoCollection
collection = null; public Customer(Pool pool,MongoCollection
collection){ this.pool = pool; this.collection = collection; } @Override public void run() { while(true){ File f = pool.fetchFile(); if(null==f){ return ; } try { saveToMonGoDb(f);// if(pool.hasUploadToDB%1000==0)// System.out.println("已写入数据:"+pool.hasUploadToDB); } catch (MongoWriteException e) { System.out.println("写入数据库异常:"+e.getMessage()); return ; } if(pool.getSize()==0){ System.out.println(Thread.currentThread().getName()+" :WRITTING FINISHED!!"); MongodbBatchInsetUtils.ThreadSizeCount++; } } } //将文件以文件名为id,文件内容为值保存在数据库中 private void saveToMonGoDb(File file){ String _id = file.getName().substring(0,file.getName().lastIndexOf(".")); String content = readFileContext(file, MongodbBatchInsetUtils.CHARSET); Document document = new Document("_id",_id).append("content", content); collection.insertOne(document); } //读取文件内容,以charSet编码处理 public static String readFileContext(File file,String charSet) { StringBuilder sb; BufferedReader reader=null; try { reader = new BufferedReader(new InputStreamReader(new FileInputStream(file), charSet)); String line = null; sb = new StringBuilder(); while(null!=(line = reader.readLine())){ sb.append(line+"\n"); } return sb.toString(); }catch (Exception e) { System.out.println("文件读取失败!"+e.getMessage()); }finally{ try { reader.close(); } catch (IOException e) { e.printStackTrace(); } } return null; } }//池,缓冲区class Pool{ volatile int size=0;//缓冲区中条目数量 volatile int limit =1000; volatile int hasUploadToDB=0; volatile private List
files = new LinkedList
(); //入栈 public synchronized void putFile(File file){ while(files.size()==limit){ try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } files.add(file); notifyAll(); ++size; } //出栈 public synchronized File fetchFile(){ while(files.size()==0 ){ try { this.wait(); } catch (InterruptedException e) { return null; } } File file = null; notify(); if(files.size()>0){ file = files.remove(0); --size; ++hasUploadToDB; } return file; } public int getSize(){ return this.size; } }

 

转载于:https://www.cnblogs.com/alienSmoking/p/5422675.html

你可能感兴趣的文章
Linux epoll 笔记(高并发事件处理机制)
查看>>
shell脚本练习01
查看>>
WPF图标拾取器
查看>>
通过取父级for循环的i来理解闭包,iife,匿名函数
查看>>
HDU 3374 String Problem
查看>>
数据集
查看>>
打印python包含汉字报SyntaxError: Non-ASCII character '\xe4' in file
查看>>
[Leetcode] unique paths ii 独特路径
查看>>
HDU 1217 Arbitrage (Floyd + SPFA判环)
查看>>
IntelliJ idea学习资源
查看>>
Django Rest Framework -解析器
查看>>
ExtJs 分组表格控件----监听
查看>>
Hibernate二级缓存配置
查看>>
LoadRunner常用术语
查看>>
关于jedis2.4以上版本的连接池配置,及工具类
查看>>
记忆讲师石伟华微信公众号2017所有文章汇总(待更新)
查看>>
mechanize (1)
查看>>
FactoryBean
查看>>
Coolite动态加载CheckboxGroup,无法在后台中获取
查看>>
如何在我们项目中利用开源的图表(js chart)
查看>>