1. 安装开发环境
1.1 Netty环境
这里我使用Netty5.0.0版本 到这里下载即可http://netty.io/ 下载netty-all-5.0.0.Alpha2.jar 这个jar包简单配置一下即可使用。
1.2 Protobuf环境
这个就比较麻烦了,这里说一下我的做法。 可以在这里下载最新版https://github.com/google/protobuf 或者使用 v2.6.1稳定版 https://github.com/google/protobuf/tree/v2.6.1
也可以在这里下载http://pkgs.fedoraproject.org/repo/pkgs/protobuf/protobuf-2.6.1.tar.bz2/
在这里下载对应的Protobuf-java.jar http://central.maven.org/maven2/com/google/protobuf/protobuf-java/
http://mvnrepository.com/artifact/com.google.protobuf/protobuf-java
1.3 Protoc 工具
Linux和Windows都差不多,编译源代码即可。
以Windows为例,打开\protobuf-2.6.1\vsprojects\protobuf.sln
这样生成解决方案。
在Debug里面这些文件是有用的
2. protobuf初始化
SubscribeReq.proto
package netty;
option java_package = "com.jieli.nettytest.protobuf";
option java_outer_classname = "SubscribeReqProto"; message SubscribeReq{
required int32 subReqID = 1;
required string userName = 2;
required string productName = 3;
repeated string address = 4;
}
SubscribeResq.proto
package netty;
option java_package = "com.jieli.nettytest.protobuf";
option java_outer_classname = "SubscribeResqProto"; message SubscribeResq{
required int32 subReqID = 1;
required int32 respCode = 2;
required string desc = 3;
}
用protobuf.exe进行编译
protoc.exe --java_out=. --cpp_out=. SubscribeReq.proto
protoc.exe --java_out=. --cpp_out=. SubscribeResq.proto
3. Protobuf 测试
TestSubscribeReqProto.java
package com.jieli.nettytest.protobuf; import java.util.ArrayList;
import java.util.List;
import com.google.protobuf.InvalidProtocolBufferException; public class TestSubscribeReqProto { private static byte[] encode(SubscribeReqProto.SubscribeReq req){
return req.toByteArray();
} private static SubscribeReqProto.SubscribeReq decode(byte[] body)
throws InvalidProtocolBufferException {
return SubscribeReqProto.SubscribeReq.parseFrom(body);
} private static SubscribeReqProto.SubscribeReq createSubscribeReq(){
SubscribeReqProto.SubscribeReq.Builder builder =
SubscribeReqProto.SubscribeReq.newBuilder();
builder.setSubReqID(1);
builder.setUserName("Lilinfeng");
builder.setProductName("netty book");
List<String> address = new ArrayList<>();
address.add("NanJing YuHuaTai");
address.add("beijin lilili");
address.add("asdfasdf");
builder.addAllAddress(address);
return builder.build();
} public static void main(String[] args) {
try {
SubscribeReqProto.SubscribeReq req = createSubscribeReq();
System.out.println("befor encode:" + req.toString());
SubscribeReqProto.SubscribeReq req2 = decode(encode(req));
System.out.println("After decode :"+req.toString());
System.out.println("assert equal : ==>" + req2.equals(req));
} catch (Exception e) {
e.printStackTrace();
}
}
}
运行结果
项目目录结构
4. java-java通信例子(跟书本上是差不多一样的)
SubReqServer.java
package com.jieli.nettytest.protobuf; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler; public class SubReqServer { public void bind(int port){
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
//与c++通信时这里的varint32要注释掉,因为默认的protobuf是没有32位对齐的,如果要实现自动分包,那么要在C++客户端进行组装
ch.pipeline().addLast(new ProtobufDecoder(
SubscribeReqProto.SubscribeReq.getDefaultInstance()));
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder());
ch.pipeline().addLast(new SubReqServerHandler());
}
}); ChannelFuture f = b.bind(port).sync(); f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
} public static void main(String[] args) {
new SubReqServer().bind(7777);
}
}
SubReqServerHandler.java
package com.jieli.nettytest.protobuf; import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext; public class SubReqServerHandler extends ChannelHandlerAdapter{
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
SubscribeReqProto.SubscribeReq req = (SubscribeReqProto.SubscribeReq) msg;
if("Lilinfeng".equalsIgnoreCase(req.getUserName())){
System.out.println("Service accept client subscribe req:["+req.toString()+"]");
//ctx.writeAndFlush(resp(req.getSubReqID()));
}
} private SubscribeResqProto.SubscribeResq resp(int subReqID){
SubscribeResqProto.SubscribeResq.Builder builder =
SubscribeResqProto.SubscribeResq.newBuilder();
builder.setSubReqID(subReqID);
builder.setRespCode(0);
builder.setDesc("Netty book order success..");
return builder.build();
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
cause.printStackTrace();
}
}
SubReqClient.java
package com.jieli.nettytest.protobuf; import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler; public class SubReqClient { public void connect(int port, String host){
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.INFO));
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast(new ProtobufDecoder(
SubscribeResqProto.SubscribeResq.getDefaultInstance()));
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder());
ch.pipeline().addLast(new SubReqClientHandler());
}
}); ChannelFuture f = b.connect(host, port).sync(); f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
} public static void main(String[] args) {
new SubReqClient().connect(7777, "localhost");
}
}
SubReqClientHandler.java
package com.jieli.nettytest.protobuf; import java.util.ArrayList;
import java.util.List; import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext; public class SubReqClientHandler extends ChannelHandlerAdapter{
public SubReqClientHandler() {
} @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for(int i=0; i<10; i++){
ctx.write(subReq(i));
}
ctx.flush();
} private SubscribeReqProto.SubscribeReq subReq(int i){
SubscribeReqProto.SubscribeReq.Builder builder =
SubscribeReqProto.SubscribeReq.newBuilder();
builder.setSubReqID(i);
builder.setUserName("Lilinfeng");
builder.setProductName("Netty Book..");
List<String> address = new ArrayList<>();
address.add("NanJin LLLLLL");
address.add("beijin lllllll");
address.add("shenzhen jjjjjj");
builder.addAllAddress(address);
return builder.build();
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception {
System.out.println("Receive server response:["+msg+"]");
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
} }
服务器运行结果
客户端运行结果
5. (C/C++)-java通信例子
本来想用mingw来实现的但是,试了几次,总是编译不过。就放弃了,使用VS2008来编译了。
新建一个控制台程序,配置下属性页, 下面这个图配置到protobuf源代码的src目录
下面这个图配置到用vs编译编译产生的中间文件,包含几个lib包的目录
将经过protoc.exe产生的*.h和*.cc文件放到对应的项目中
main.cpp代码
#include <iostream>
#include <windows.h>
#include "SubscribeReq.pb.h"
#include "SubscribeResq.pb.h" #pragma comment(lib, "ws2_32.lib")
#pragma comment(lib,"libprotobuf.lib")
#pragma comment(lib,"libprotobuf-lite.lib") using namespace std;
using namespace netty; //打开连接
SOCKET open_msg(char *host, int port)
{
//初始化Socket dll
WSADATA wsaData;
WORD socketVersion = MAKEWORD(,);
if(WSAStartup(socketVersion,&wsaData)!=)
{
printf("Init socket dll error!");
return -;
}
//创建socket
SOCKET s = socket(AF_INET, SOCK_STREAM, ); //tcp
if (SOCKET_ERROR == s)
{
printf("Create Socket Error!");
return -;
}
//指定服务端的地址
sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_addr.S_un.S_addr = inet_addr(host);
server_addr.sin_port = htons(port); char opt = ;
int ret = setsockopt(s, IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(char));
if(ret == -)
{
printf("ERROR\n");
return -;
} //连接
if (SOCKET_ERROR == connect(s, (LPSOCKADDR)&server_addr, sizeof(server_addr)))
{
printf("Can Not Connect To Server IP!\n");
return -;
}
return s;
}
//获取信息
int recv_msg(SOCKET s,char *msg,int size)
{
int ret = recv(s,msg,size,);
if(ret == SOCKET_ERROR)
{
printf("Recv Error.\n");
return -;
}
return ret;
}
//发送信息
int send_msg(SOCKET s,char *msg,int size)
{
int ret = send(s,msg,size,);
if(ret == SOCKET_ERROR)
{
printf("Send Error.\n");
return -;
}
return ret;
}
//关闭连接
int close_msg(SOCKET s)
{
closesocket(s);
return ;
} int main()
{
SubscribeReq req ;
req.set_username("Lilinfeng");
req.add_address("asdf");
req.set_subreqid();
req.set_productname("laskjdfk111"); SOCKET s = open_msg("127.0.0.1", ); char msg[] = {};
req.SerializePartialToArray(msg, ); cout<<req.GetCachedSize()<<endl;
send_msg(s, msg, req.GetCachedSize()); close_msg(s); system("pause");
return ;
}
然后编译运行就可以发送protobuf对象到java服务器端,运行后服务器出现这个结果
找了很久原因,原来是服务器SubReqServer.java中的ProtobufVarint32***解码器对Protobuf包进行处理,导致格式不一致,解决的办法是注释掉这两行,不过这样又会产生书本上说到的问题,会出现粘包。我能想到的办法是1.在C++客户端中进行修改,使之对应到Java中对齐格式,这个要看源代码。 2.发送的包前面增加包头,然后包头信息描述Protobuf大小。
参考资料
Netty权威指南 – 第八章 Google Protobuf 编解码
http://blog.****.net/majianfei1023/article/details/45371743
http://www.cnblogs.com/lidabo/p/3911456.html