mongodb 的简单应用

时间:2021-04-30 20:34:30

1.mongodb的连接类

import org.apache.log4j.Logger;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoDatabase;
public class MongoHelper {
        static Logger logger = Logger.getLogger(MongoHelper.class);
        static final String DBName = "wb";
        static final String ServerAddress = "localhost"; 
        static final int PORT = 27017;

        public MongoHelper(){
        }

        public MongoClient getMongoClient( ){
            MongoClient mongoClient = null;
            try {
                  // 连接到 mongodb 服务
                mongoClient = new MongoClient(ServerAddress, PORT); 
                logger.info("======================Connect to mongodb successfully");
            } catch (Exception e) {
                System.err.println(e.getClass().getName() + ": " + e.getMessage());
            }
            return mongoClient;
        }

        public MongoDatabase getMongoDataBase(MongoClient mongoClient) {  
            MongoDatabase mongoDataBase = null;
            try {  
                if (mongoClient != null) {  
                      // 连接到数据库
                    mongoDataBase = mongoClient.getDatabase(DBName);  
                    logger.info("=================Connect to DataBase successfully");
                } else {  
                    throw new RuntimeException("MongoClient不能够为空");  
                }  
            } catch (Exception e) {  
                e.printStackTrace();  
            }
            return mongoDataBase;
        }  

        public void closeMongoClient(MongoDatabase mongoDataBase,MongoClient mongoClient ) {  
            if (mongoDataBase != null) {  
                mongoDataBase = null;  
            }  
            if (mongoClient != null) { 
                mongoClient.close();
                mongoClient=null;
            }  
            logger.info("==================mongoClient:"+mongoClient);
            logger.info("===============CloseMongoClient successfully");  

        }  
        
        
}

2.mongodb的简单插入

(1).每次从库里查询出来100条数据进行处理

(2).处理数据

(3).切库

(4).查询

(5).插入到另一个库

import java.util.ArrayList;
import net.sf.json.JSONObject;
import org.apache.log4j.Logger;
import org.bson.Document;
import org.bson.conversions.Bson;
import com.mongodb.BasicDBObject;
import com.mongodb.Block;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;

public class fanMain {
    private final static Logger logger = Logger.getLogger(fanMain.class);

    public static void main(String[] args) {
        MongoHelper mongoHelper = new MongoHelper();
        MongoClient mongoClient = mongoHelper.getMongoClient();
        MongoDatabase mongoDatabase = mongoHelper.getMongoDataBase(mongoClient);
        MongoCollection<Document> collection = null;
        int id = Integer.valueOf(args[0]);
        int skip = id;
        logger.info("===skip:"+skip);
        try {

            while (true) {
                collection = mongoDatabase.getCollection("wbid");
                logger.info("=========切入wbid表");
                logger.info("====ture skip:"+skip);
                MongoCursor<Document> mongoCursor = collection.find().skip(skip).limit(100).iterator();
                if (mongoCursor.hasNext()) {
                    
                    while (mongoCursor.hasNext()) {
                        Document yh = mongoCursor.next();
                        String jj=yh.toJson();
                        JSONObject js=JSONObject.fromObject(jj);
                        String yongh=js.getString("yongh");
                        logger.info("=============yonghu:"+yongh);
                        try{
                            System.err.println("===近来try");
                            MongoCollection<Document> coll = mongoDatabase.getCollection("wbData");
                            
                            logger.info("=========切入wbData表");
                            Bson parent = Filters.eq("yongh", yongh);
                            logger.info("===执行了parent");
                            //返回总数
                            /*Date d=new Date();
                            long t1=d.getTime();
                            System.out.println("t1:"+t1);
                            logger.info("=========t1:"+t1);*/
                            Long post_count=coll.count(Filters.eq("yongh", yongh));
                            System.out.println("post_count:"+post_count);
                            logger.info("======post_count:"+post_count);
                            /*long t2=d.getTime();
                            System.out.println("t2:"+t2);
                            logger.info("======t2:"+t2);
                            long tt=t2-t1;
                            logger.info("===tt:"+tt);*/
                            logger.info("=====post_count:"+post_count);
                            MongoCursor<Document> m=coll.find(parent).iterator();
                            /*long t3=d.getTime();
                            System.out.println("t3:"+t3);
                            logger.info("===========t3:"+t3);
                            long tt1=t3-t2;
                            logger.info("===tt1:"+tt1);
                            logger.info("===查询了!!!tt1:"+tt1);*/
                            logger.info("======查询了!!!");
                            int count = 0;
                            int fans=0;
                            Document item = m.next();
                            fans=(int)item.get("nfans");//9c1d5252eccf6493b9a835ba74b979b0
                            System.out.println(fans);
                            logger.info("===fans:"+fans);
                             BasicDBObject group = new BasicDBObject();
                             
                            /*while(m.hasNext()){
                                 logger.info("====进入while");
                                Document item = m.next();
                                String s=(String)item.get("yongh");
                                System.out.println(s);
                                fans=(int)item.get("nfans");
                                System.out.println(fans);
                                logger.info("===fans:"+fans);
                                count++;
                                logger.info("===count:" + count);
                                break;
                            }*/
                            JSONObject json = new JSONObject();
                            json.put("yongh", yongh);
                            json.put("post_count", post_count);
                            json.put("fans", fans);
                            System.out.println(json.get("post_count"));
                            System.err.println(json.get("fans"));
                            String obj=json.toString();
                            System.out.println("====obj:"+obj);
                            System.err.println("====obj:"+obj);
                            MongoCollection<Document> c = mongoDatabase.getCollection("wbfans");
                            logger.info("===切换到wbfans里");
                            Document doc = Document.parse(obj);
                            c.insertOne(doc);
                            logger.info("!!!****************!insert===插入fans成功!!!!");
                            logger.info("===完成插入skip:"+skip);
                        }catch(Exception e){
                            e.printStackTrace();
                        }
                        
                    }
                    logger.info("==束==skip qian:"+skip);
                    logger.error("==束==skip qian:"+skip);
                    System.out.println("==束==skip qian:"+skip);
                    System.err.println("==束==skip qian:"+skip);
                    skip = skip + 100;
                    logger.info("==循环结束=skip变化:"+skip);
                    logger.error("==循环结束=skip变化:"+skip);
                } else {
                    logger.info("======要退出了!!!");
                    break;
                }
                
            }

        } catch (Exception e) {
            // TODO: handle exception
            e.printStackTrace();
        } finally {
            mongoHelper.closeMongoClient(mongoDatabase, mongoClient);
        }

    }

    
}

3.此方法和2中的效果一样只是 实现方式不同

import java.awt.List;
import java.util.ArrayList;
import java.util.Map;

import net.sf.json.JSONObject;

import org.apache.log4j.Logger;
import org.bson.Document;
import org.bson.conversions.Bson;

import com.mongodb.Block;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.MongoClient;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;

public class countFansMain {
    private final static Logger logger = Logger.getLogger(countFansMain.class);

    public static void main(String[] args) {
        MongoHelper mongoHelper = new MongoHelper();
        MongoClient mongoClient = mongoHelper.getMongoClient();
        MongoDatabase mongoDatabase = mongoHelper.getMongoDataBase(mongoClient);
        MongoCollection<Document> collection = null;
        int skip = 0;
        try {

            while (true) {
                collection = mongoDatabase.getCollection("wbid");
                logger.info("=========切入wbid表");
                FindIterable<Document> findIterable = collection.find()
                        .skip(skip).limit(100);
                MongoCursor<Document> mongoCursor = findIterable.iterator();
                if (mongoCursor.hasNext()) {
                    Document yh = mongoCursor.next();
                    for (Document document : findIterable) {
                        Object yongh = document.get("yongh");
                        logger.info("===yonghu:" + yongh);
                        collection = mongoDatabase.getCollection("wbData");
                        logger.info("=========切入wbData表");
                        try{
                            System.err.println("===近来try");
                            logger.info("===进来xiaotry");
                            FindIterable<Document> iters = collection
                                    .find(new Document("yongh", yongh));
                            logger.info("======执行了查询用户");
                            System.err.println("======执行了查询用户");
                            MongoCursor<Document> moc = iters.iterator();
                            logger.info("=====MongoCursor");
                            System.err.println("======执行了查询用户");
                            int count = 0;
                            Document fans = moc.next();
                            Object fs = null;
                            for (Document d : iters) {
                                logger.info("=====开始遍历!!!!");
                                 System.err.println("====开始遍历!!!!!");
                                 fs= d.get("nfans");
                                logger.info("====fans:" + fs);
                                count++;
                                logger.info("===count:" + count);
                            }
                            
                            JSONObject json = new JSONObject();
                            json.put("yongh", yongh);
                            json.put("post_count", count);
                            json.put("fans", fs);
                            System.out.println(json.get("post_count"));
                            System.err.println(json.get("fans"));
                            String obj=json.toString();
                            System.out.println("====obj:"+obj);
                            System.err.println("====obj:"+obj);
                            collection = mongoDatabase.getCollection("wbfans");
                            logger.info("===切换到wbfans里");
                            Document doc = Document.parse(obj);
                            collection.insertOne(doc);
                            logger.info("!!!****************!insert===插入fans成功!!!!");
                        }catch(Exception e){
                            e.printStackTrace();
                        }
                        
                    }

                    skip = skip + 100;
                    logger.info("====skip变化:"+skip);
                } else {
                    logger.info("======要退出了!!!");
                    break;
                }
                
            }

        } catch (Exception e) {
            // TODO: handle exception
            e.printStackTrace();
        } finally {
            mongoHelper.closeMongoClient(mongoDatabase, mongoClient);
        }

    }
}

4.mongodb的聚合函数应用 ,主要是应用db.wbfans.aggregate([{$group : {_id : {post_count:"$post_count"}, sum_fans: {$sum: "$fans"}}}])

按照post_count分组 并对fans列求和,对AggregateIterable<Document> iterable = collection.aggregate(pipeline).allowDiskUse(true);中的allowDiskUse(true)解释i

 如果你从事过大数据计算、数据统计等相关工作,应该知道每个计算任务(job或task)都会使用独立的有限大小的内存空间,mongodb没有提供复杂的内存分配模型(任务调度算法),只是简单的限定每个stage最多使用100M内存,如果超过此值将终止计算并返回error;为了支持较大数据集合的处理,我们可以指定“allowDiskUse”参数将“溢出”的数据写入本地的临时文件中(临时的collection),这个参数我们通常需要设定为true。(参见上述示例)

mport java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.List;

import net.sf.json.JSONObject;

import org.apache.log4j.Logger;
import org.bson.Document;

import com.mongodb.MongoClient;
import com.mongodb.client.AggregateIterable;

import com.mongodb.client.MongoCollection;

import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.Filters;

public class avgFansMain {
    private final static Logger logger = Logger.getLogger(avgFansMain.class);

    public static void main(String[] args) {
        MongoHelper mongoHelper = new MongoHelper();
        MongoClient mongoClient = mongoHelper.getMongoClient();
        MongoDatabase mongoDatabase = mongoHelper.getMongoDataBase(mongoClient);
        MongoCollection<Document> collection = null;
        /* int id = Integer.valueOf(args[0]); */
        int skip = 0;
        logger.info("===skip:" + skip);
        try {

            collection = mongoDatabase.getCollection("wbfans");
            logger.info("=========切入wbfans表");
            logger.info("====ture skip:" + skip);
            List<Document> pipeline = Arrays.asList(new Document("$group",
                    new Document("_id", new Document("post_count",
                            "$post_count")

                    ).append("sum_fans", new Document("$sum", "$fans"))));
            AggregateIterable<Document> iterable = collection.aggregate(
                    pipeline).allowDiskUse(true);
            int i = 0;
            for (Document d : iterable) {
                i++;
                logger.info("****************第" + i + "条***************");
                Document _id = (Document) d.get("_id");
                int post_count = _id.getInteger("post_count");
                logger.info("====post_count:" + post_count);
                Long sumPostCount = collection.count(Filters.eq("post_count",
                        post_count));
                logger.info("=======sumPostCount:" + sumPostCount);
                int sum_fans = (int) d.get("sum_fans");
                logger.info("======sum_fans:" + sum_fans);
                double sumFans = (double) sum_fans;
                double sumPC = (double) sumPostCount;
                logger.info("=======sumFans:" + sumFans + ",sumPC:" + sumPC);
                double avgFans = sumFans / sumPC;
                logger.info("======avgFans:" + avgFans);
                JSONObject json = new JSONObject();
                json.put("post_count_yongh", post_count);
                json.put("sum_yongh", sumPostCount);
                json.put("sum_fans", sum_fans);
                json.put("avg_fans", avgFans);
                // 插入集合中
                MongoCollection<Document> c = mongoDatabase
                        .getCollection("vagFans");
                logger.info("===切换到vagFans里");
                Document doc = Document.parse(json.toString());
                c.insertOne(doc);
                logger.info("!!!****************!insert===插入avg_fans成功!!!!");
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            mongoHelper.closeMongoClient(mongoDatabase, mongoClient);
        }

    }

}

推荐https://blog.csdn.net/xundh/article/details/49384393