用java在客户端读取mongodb中的数据并发送至服务器

时间:2022-09-17 19:58:48

使用Java自带的socket端口来实现,程序如下:

Client.java

package com.cn.gao;
import java.net.*;
import java.io.*;

import com.mongodb.*;
/**
 * 客户端发送消息给服务器
 * @author hadoop
 *
 */
public class Client {
    private Socket client;
    private boolean connected;
    //客户端构造函数
    public Client(String host,int port){
            try {
                client = new Socket(host,port);
                System.out.println("连接服务器成功!");
                this.connected = true;
            } catch (UnknownHostException e) {
                System.out.println("无法解析主机名!");
                this.connected = false;
            } catch (IOException e) {
                System.out.println("输入输出错误!");
                this.connected = false;
                closeSocket();
            }        
    }
    //判断是否连接
    public boolean isConnected(){
        return connected;
    }
    //设置连接状态
    public void setConnected(boolean connected){
        this.connected = connected;
    }
    /**
     * 发送数据到端口
     * @param dbname mongodb数据库名字
     * @param collectionName 该数据库中要发送数据的collection名
     */
    public void sendFile(String dbname,String collectionName){
        DataOutputStream dos = null; 
        DataInputStream dis = null;  
        if(client==null) return;
        //从mongodb数据库中读取数据
        Mongo connection = new Mongo("localhost:27017");
        DB db = connection.getDB(dbname);
        DBCollection input = db.getCollection(collectionName);
/*        BasicDBObject condition=new BasicDBObject();//条件  
        BasicDBObject key=new BasicDBObject("vtext",2);//指定需要显示列 
        DBCursor cur = input.find(condition,key);*/
        DBCursor cur = input.find();
        try {
            while(cur.hasNext()){
                DBObject document = cur.next();
                dis = new DataInputStream(new ByteArrayInputStream(document.toString().getBytes()));
                dos = new DataOutputStream(client.getOutputStream());
                int bufferSize = 10240;
                byte[] buf = new byte[bufferSize];
                int num =0;
                while((num=dis.read(buf))!=-1){
                    System.out.println(new String(buf));
                    dos.write(buf, 0, num);
                }
                dos.flush();
            }
            System.out.println("传输成功!");
        } catch (IOException e) {
            e.printStackTrace();
//            System.out.println("输入输出错误!");
            closeSocket();
        } finally{
            try {
                if(dis!=null) dis.close();
                if(dos!=null) dos.close();
            } catch (IOException e) {
                e.printStackTrace();
//                System.out.println("输入输出错误!");
            }
        }

    }

    //关闭客户端
    public void closeSocket(){
        if(client!=null){
            try {
                client.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    /**
     * @param args
     * 参数格式如下
     * hostname dbname collectionName
     */
    public static void main(String[] args){
        //默认端口为8888
        if(args.length!=3){
            System.out.println("参数格式不对!");
            return;
        }
        String hostName = args[0];
        int port = 8888;
        Client client = null;
        client = new Client(hostName,port);
        String dbname = args[1];
        String collectionName = args[2];
        if(client.isConnected()){
            client.sendFile(dbname, collectionName);
            client.closeSocket();
        }
        
    }
}

Server.java

package com.cn.gao;
import java.io.*;
import java.net.*;

/**
 * 服务器端
 * @author hadoop
 *
 */
public class Server {
    private int port;
    private String host;
    private static ServerSocket server;
    
    public Server(int port){
        this.port = port;
        this.server = null;
    }
    
    public void run(){
        if(server==null){
            try {
                server = new ServerSocket(port);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        System.out.println("服务已启动...");
        while(true){
            try {
                Socket client = server.accept();
                if(client==null) continue;
            new SocketConnection(client).run();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    
    
    public class SocketConnection extends Thread{
        private Socket client;    
        public SocketConnection(Socket client){
            this.client = client;
        }
        
        public void run(){
            if(client==null) return;
            DataInputStream in= null; 
            boolean flag = true;
            try {
                while(flag){
                in = new DataInputStream(new BufferedInputStream(client.getInputStream()));
                 int bufferSize = 10240;
                    byte[] buf = new byte[bufferSize];
                    int num =0;
                    while((num=in.read(buf))!=-1){        
                        System.out.println(new String(buf));
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            } finally{
                try {
                    if(in!=null)  in.close();
                    client.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    
    
    public static void main(String[] args){
        int port = 8888;
        new Server(port).run();
    }
}