Camel的数据转换

时间:2024-04-03 17:01:41

Camel的数据转换

在做系统集成的时候,必不可少的任务就是将数据从一种格式转换为另一种格式,再把转换后的格式发到目标系统:

Camel的数据转换

Camel提供的Message translator可以分为:

■ Using a Processor
■ Using beans
■ Using <transform>

1,利用processor的方式在Apache Camel框架入门示例 已经有个介绍.blog.csdn.net/kkdelta/article/details/7231640

通过实现Processor的方法process(Exchange exchange),通常要做的步骤是,从inbound消息里得到数据,转换,再放到outbound的消息里.

XXX data = exchange.getIn().getBody(XXX.class);

//转换

exchange.getOut().setBody(yyyObject);

2,利用bean的方式,写一个java bean如下:

public class OrderToCsvBean {
    public String map(String custom) {
        String id = custom.substring(0, 9);
        String customerId = custom.substring(10, 19);
        String date = custom.substring(20, 29);
        String items = custom.substring(30);
        String[] itemIds = items.split("@");

        StringBuilder csv = new StringBuilder();
        csv.append(id.trim());
        csv.append(",").append(date.trim());
        csv.append(",").append(customerId.trim());
        for (String item : itemIds) {
            csv.append(",").append(item.trim());
        }
        return csv.toString();
    }
}

在route里的代码如下:

from("file:d:/temp/inbox1/?delay=30000").bean(new OrderToCsvBean()).to("file:d:/temp/outbox1");

Camel有一套算法来选择调用bean里的什么方法(比如说message的header里是否通过CamelBeanMethodName设置了方法名称,bean是否只有一个方法,某一个方法是否加了@Handler注解等等),这里的bean只有一个map方法,Camel会调用这个方法.

3,用route中已经定义好的transform方法,一般用的较少.

from("direct:start").transform(body().regexReplaceAll("\n", "<br/>")).to("mock:result");

4,Camel集成对许多常用的数据格式的解析:通过调用unmarshal方法可以得到解析后的结果.

CSV:

from("file:d:/temp/inbox1/?delay=30000").unmarshal().csv().process(processor).to("file:d:/temp/outbox1");

processor里可以直接得到解析后的数据:

public class CSVProcessor implements Processor {
    @Override
    public void process(Exchange exchange) throws Exception {
        System.out.println("process...");
        List csvData = (List) exchange.getIn().getBody();
        StringBuffer strbf = new StringBuffer();
        if (csvData.get(0).getClass().equals(String.class)) {//single line CSV            
            strbf.append(parseOrder(csvData));
        } else {//multiple line CSV
            for (List<String> csvOrder : (List<List<String>>) csvData) {
                strbf.append(parseOrder(csvOrder));
            }
        }
        exchange.getOut().setBody(strbf.toString());
    }

    private String  parseOrder(List<String> csvData) {        
        System.out.println(csvData.get(0) + "--" + csvData.get(1)+"--" +csvData.get(2));
        return csvData.get(0) + "--" + csvData.get(1)+"--" +csvData.get(2);
    }
}


Apache Camel框架之HTTP路由

继介绍完Camel如何处理FTP,JMS接口之后,今天介绍一下系统集成的时候经常遇到的另一个接口,HTTP,一个示例需求如下图所示:(图片来源于Camel in Action)

Camel的数据转换

本文给出一个简单的代码示例如何用Camel来实现这样一个应用:

1,在一个JAVA类里配置如下路由:这里只示例了HTTP的部分,其他功能实现可以参见Apache Camel框架系列的其他博客.

[java] view plain copy
  1. public class HttpPollWithQuartzCamel {  
  2.     public static void main(String args[]) throws Exception {  
  3.         CamelContext context = new DefaultCamelContext();  
  4.         context.addRoutes(new RouteBuilder() {  
  5.             public void configure() {                  
  6.                 from("quartz://report?cron=10 * * * * ?&stateful=true")  
  7.                 .to("http://localhost:8080/prjWeb/test.camelreq")  
  8.                 .to("file:d:/temp/outbox?fileName=http.csv");  
  9.                 );  
  10.             }  
  11.         });  
  12.         context.start();  
  13.         boolean loop = true;  
  14.         while (loop) {  
  15.             Thread.sleep(25000);  
  16.         }  
  17.         context.stop();  
  18.     }  
  19. }  

对上面代码的简单解释: from("quartz://report?cron=10 * * * * ?&stateful=true"),配置一个quartz Job,每隔10秒发送一个HTTP request,将收到的内容保存为文件.

这里的http url可以是任何可以访问的http url,如果在http访问时候需要代理可以这么配置:"http://www.baidu.com?proxyHost=proxy.xxx.com&proxyPort=8080"

这个例子需要用到quartz,和httpclient等jar包,可以从这里下载: http://download.csdn.net/detail/kkdelta/4051072


Apache Camel框架集成Spring

Apache Camel提供了和Spring的集成,通过Spring容器(ApplicationContext)来管理Camel的CamelContext,这样的话,就不需要写代码来控制CamelContext的初始化,启动和停止了.Camel会随着Spring的启动而启动起来.

本文将Apache Camel框架入门示例(http://blog.csdn.net/kkdelta/article/details/7231640)中的例子集成到Spring中,下面简单介绍一下集成的基本步骤.

1,新建一个Eclipse工程,将Spring3的jar包,和Camel的jar包配置到工程的classpath.

2,Route类要继承RouteBuilde,如下

[java] view plain copy
  1. public class FileProcessWithCamelSpring extends RouteBuilder {  
  2.     @Override  
  3.     public void configure() throws Exception {  
  4.         FileConvertProcessor processor = new FileConvertProcessor();   
  5.         from("file:d:/temp/inbox?delay=30000").process(processor).to("file:d:/temp/outbox");          
  6.     }  
  7. }  

3,Processor仍然和和入门示例的代码相同.

[java] view plain copy
  1. public class FileConvertProcessor implements Processor{  
  2.     @Override  
  3.     public void process(Exchange exchange) throws Exception {      
  4.         try {  
  5.             InputStream body = exchange.getIn().getBody(InputStream.class);  
  6.             BufferedReader in = new BufferedReader(new InputStreamReader(body));  
  7.             StringBuffer strbf = new StringBuffer("");  
  8.             String str = null;  
  9.             str = in.readLine();  
  10.             while (str != null) {                  
  11.                 System.out.println(str);  
  12.                 strbf.append(str + " ");  
  13.                 str = in.readLine();                  
  14.             }  
  15.             exchange.getOut().setHeader(Exchange.FILE_NAME, "converted.txt");  
  16.             // set the output to the file  
  17.             exchange.getOut().setBody(strbf.toString());  
  18.         } catch (IOException e) {  
  19.             e.printStackTrace();  
  20.         }  
  21.     }  
  22. }  

4,创建一个Spring的配置文件如***意要将camel的xmlns加入文件中

[html] view plain copy
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   
  4.     xmlns:camel="http://camel.apache.org/schema/spring"  
  5.     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
  6.     http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"  
  7.     default-autowire="byName"  default-init-method="init">  
  8.     <camelContext id="testCamelContext" xmlns="http://camel.apache.org/schema/spring">  
  9.         <package>com.test.camel</package>  
  10.     </camelContext>      
  11. </beans>  

5,启动Spring容器,Camel会自动启动,不用像入门示例那样CamelContext context = new DefaultCamelContext(), context.addRoutes(..); context.start();

        ApplicationContext ac = new ClassPathXmlApplicationContext("config/cameltest.xml");
        while (true) {
            Thread.sleep(2000);
        }

可见,Camel可以很容易的和Spring集成.

Camel还提供了"Spring DSL"来在XML中配置Route规则,不需要用JAVA类(如上面的FileProcessWithCamelSpring )来实现route.

[html] view plain copy
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   
  4.     xmlns:camel="http://camel.apache.org/schema/spring"  
  5.     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
  6.     http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"  
  7.     default-autowire="byName"  default-init-method="init">  
  8.     <bean id="fileConverter" class="com.test.camel.FileConvertProcessor"/>  
  9.     <camelContext id="testCamelContext" xmlns="http://camel.apache.org/schema/spring">  
  10.         <route>  
  11.             <from uri="file:d:/temp/inbox?delay=30000"/>  
  12.             <process ref="fileConverter"/>  
  13.             <to uri="file:d:/temp/outbox"/>  
  14.         </route>  
  15.     </camelContext>  
  16. </beans>  

与第五步一样启动Spring容器,Camel会每隔30秒轮询一下看d:/temp/inbox是否有文件,有的话则进行处理.

Apache Camel Route节点的消息载体Exchange

在Camel的route中,消息在Route的各个节点中是以Exchange的形式传递的,所以对Exchange结构的理解对使用Camel来说是很重要的.
Exchange ID 如果不指定,Camel会默认设置一个,可以用来标识一个route的一次执行.
MEP message exchange pattern,有InOnly和InOut方式.
Exception 但route出异常的时候,抛出的异常赋值给这个变量(但在示例中似乎不是这样?).
In message 上一个节点传入的内容,是mandatory.的.
Out message 当MEP 是InOut的时候才用,不是mandatory.的.
Headers 键值对<String Object>,在下一个节点可以再取出来.
Attachments 在调用web service或者发邮件的时候放附件.
Body 消息内容,java对象.
Exchange的结构如下图所示:

Camel的数据转换

Exchange可以直接作为参数在route用到的方法中使用,如果route中的方法不是Exchange,Camel会根据一套规则将Exchange中的Body转换成该方法的参数类型.

这个结构里的各部分内容可以像下面的代码示例的方式进行访问:

[java] view plain copy
  1.     from("file:d:/temp/inbox?delay=3000")  
  2.     .bean(new ExchangeManipulateBean(),"implicitConvert")  
  3.     .bean(new ExchangeManipulateBean(),"traverseExchange")  
  4.     .to("file:d:/temp/outbox");  
  5.   
  6. public class ExchangeManipulateBean {  
  7.     private static Logger log = Logger.getLogger(ExchangeManipulateBean.class);  
  8.     public void implicitConvert(String msg){          
  9.             log.debug("implicitConvert: " + msg);          
  10.     }  
  11.     public void traverseExchange(Exchange exchange){  
  12.         log.debug("exchange.getExchangeId() " + exchange.getExchangeId());  
  13.         log.debug("exchange.getPattern() " + exchange.getPattern());  
  14.         log.debug("exchange.getException() " + exchange.getException());  
  15.           
  16.         Map<String, Object> props = exchange.getProperties();  
  17.         for(String key:props.keySet()){  
  18.             log.debug("props: " + key + " " + props.get(key));  
  19.         }  
  20.           
  21.         Message in = exchange.getIn();//in always null,but can be accessed without exception  
  22.           
  23.         log.debug("exchange.getIn() " + in);  
  24.         log.debug("in.getMessageId() " + in.getMessageId() );  
  25.         log.debug("in.getClass().getName() " + in.getClass().getName());  
  26.   
  27.         Map<String, Object> headers = in.getHeaders();  
  28.         for(String key:headers.keySet()){  
  29.             log.debug("headers: " + key + " " + headers.get(key));  
  30.         }  
  31.           
  32.         Object body = in.getBody();  
  33.         log.debug("body " + body  + " " + body.getClass().getName());  
  34.         log.debug("in.getAttachmentNames() " + in.getAttachmentNames());  
  35.         //Message out = exchange.getOut();//whenever out is touched,information in headers are lost  
1* implicitConvert 的方法接收的参数是String,Camel会在运行的时候根据一定的规则将"In Message"的Body转换成String,这里是将文件的内容转成String,这是由Camel内部的TypeConverter完成的.
2* exchange.getException()不能得到Exception,需要通过这样的方式: Exception exception = (Exception) exchange.getProperty(Exchange.EXCEPTION_CAUGHT);
3* exchange.getOut()被调用后,还需要将 "In Message"的header里的内容在copy到 "Out message"中去.通常都绕过out往下一个节点传值,可以用这种方式:exchange.getIn().setBody("$contents"),
在下一个节点仍然可以得到内容,同时前面节点set在header中的内容也存在.

4*本人用的是Camel 2.7.5,不知道后续的版本会不会把这些"pitfall"给填上.

Camel的数据转换

在做系统集成的时候,必不可少的任务就是将数据从一种格式转换为另一种格式,再把转换后的格式发到目标系统:

Camel的数据转换

Camel提供的Message translator可以分为:

■ Using a Processor
■ Using beans
■ Using <transform>

1,利用processor的方式在Apache Camel框架入门示例 已经有个介绍.blog.csdn.net/kkdelta/article/details/7231640

通过实现Processor的方法process(Exchange exchange),通常要做的步骤是,从inbound消息里得到数据,转换,再放到outbound的消息里.

XXX data = exchange.getIn().getBody(XXX.class);

//转换

exchange.getOut().setBody(yyyObject);

2,利用bean的方式,写一个java bean如下:

public class OrderToCsvBean {
    public String map(String custom) {
        String id = custom.substring(0, 9);
        String customerId = custom.substring(10, 19);
        String date = custom.substring(20, 29);
        String items = custom.substring(30);
        String[] itemIds = items.split("@");

        StringBuilder csv = new StringBuilder();
        csv.append(id.trim());
        csv.append(",").append(date.trim());
        csv.append(",").append(customerId.trim());
        for (String item : itemIds) {
            csv.append(",").append(item.trim());
        }
        return csv.toString();
    }
}

在route里的代码如下:

from("file:d:/temp/inbox1/?delay=30000").bean(new OrderToCsvBean()).to("file:d:/temp/outbox1");

Camel有一套算法来选择调用bean里的什么方法(比如说message的header里是否通过CamelBeanMethodName设置了方法名称,bean是否只有一个方法,某一个方法是否加了@Handler注解等等),这里的bean只有一个map方法,Camel会调用这个方法.

3,用route中已经定义好的transform方法,一般用的较少.

from("direct:start").transform(body().regexReplaceAll("\n", "<br/>")).to("mock:result");

4,Camel集成对许多常用的数据格式的解析:通过调用unmarshal方法可以得到解析后的结果.

CSV:

from("file:d:/temp/inbox1/?delay=30000").unmarshal().csv().process(processor).to("file:d:/temp/outbox1");

processor里可以直接得到解析后的数据:

public class CSVProcessor implements Processor {
    @Override
    public void process(Exchange exchange) throws Exception {
        System.out.println("process...");
        List csvData = (List) exchange.getIn().getBody();
        StringBuffer strbf = new StringBuffer();
        if (csvData.get(0).getClass().equals(String.class)) {//single line CSV            
            strbf.append(parseOrder(csvData));
        } else {//multiple line CSV
            for (List<String> csvOrder : (List<List<String>>) csvData) {
                strbf.append(parseOrder(csvOrder));
            }
        }
        exchange.getOut().setBody(strbf.toString());
    }

    private String  parseOrder(List<String> csvData) {        
        System.out.println(csvData.get(0) + "--" + csvData.get(1)+"--" +csvData.get(2));
        return csvData.get(0) + "--" + csvData.get(1)+"--" +csvData.get(2);
    }
}




Apache Camel提供了和Spring的集成,通过Spring容器(ApplicationContext)来管理Camel的CamelContext,这样的话,就不需要写代码来控制CamelContext的初始化,启动和停止了.Camel会随着Spring的启动而启动起来.

本文将Apache Camel框架入门示例(http://blog.csdn.net/kkdelta/article/details/7231640)中的例子集成到Spring中,下面简单介绍一下集成的基本步骤.

1,新建一个Eclipse工程,将Spring3的jar包,和Camel的jar包配置到工程的classpath.

2,Route类要继承RouteBuilde,如下

[java] view plain copy
  1. public class FileProcessWithCamelSpring extends RouteBuilder {  
  2.     @Override  
  3.     public void configure() throws Exception {  
  4.         FileConvertProcessor processor = new FileConvertProcessor();   
  5.         from("file:d:/temp/inbox?delay=30000").process(processor).to("file:d:/temp/outbox");          
  6.     }  
  7. }  

3,Processor仍然和和入门示例的代码相同.

[java] view plain copy
  1. public class FileConvertProcessor implements Processor{  
  2.     @Override  
  3.     public void process(Exchange exchange) throws Exception {      
  4.         try {  
  5.             InputStream body = exchange.getIn().getBody(InputStream.class);  
  6.             BufferedReader in = new BufferedReader(new InputStreamReader(body));  
  7.             StringBuffer strbf = new StringBuffer("");  
  8.             String str = null;  
  9.             str = in.readLine();  
  10.             while (str != null) {                  
  11.                 System.out.println(str);  
  12.                 strbf.append(str + " ");  
  13.                 str = in.readLine();                  
  14.             }  
  15.             exchange.getOut().setHeader(Exchange.FILE_NAME, "converted.txt");  
  16.             // set the output to the file  
  17.             exchange.getOut().setBody(strbf.toString());  
  18.         } catch (IOException e) {  
  19.             e.printStackTrace();  
  20.         }  
  21.     }  
  22. }  

4,创建一个Spring的配置文件如***意要将camel的xmlns加入文件中

[html] view plain copy
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   
  4.     xmlns:camel="http://camel.apache.org/schema/spring"  
  5.     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
  6.     http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"  
  7.     default-autowire="byName"  default-init-method="init">  
  8.     <camelContext id="testCamelContext" xmlns="http://camel.apache.org/schema/spring">  
  9.         <package>com.test.camel</package>  
  10.     </camelContext>      
  11. </beans>  

5,启动Spring容器,Camel会自动启动,不用像入门示例那样CamelContext context = new DefaultCamelContext(), context.addRoutes(..); context.start();

        ApplicationContext ac = new ClassPathXmlApplicationContext("config/cameltest.xml");
        while (true) {
            Thread.sleep(2000);
        }

可见,Camel可以很容易的和Spring集成.

Camel还提供了"Spring DSL"来在XML中配置Route规则,不需要用JAVA类(如上面的FileProcessWithCamelSpring )来实现route.

[html] view plain copy
  1. <?xml version="1.0" encoding="UTF-8"?>  
  2. <beans xmlns="http://www.springframework.org/schema/beans"  
  3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"   
  4.     xmlns:camel="http://camel.apache.org/schema/spring"  
  5.     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd  
  6.     http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"  
  7.     default-autowire="byName"  default-init-method="init">  
  8.     <bean id="fileConverter" class="com.test.camel.FileConvertProcessor"/>  
  9.     <camelContext id="testCamelContext" xmlns="http://camel.apache.org/schema/spring">  
  10.         <route>  
  11.             <from uri="file:d:/temp/inbox?delay=30000"/>  
  12.             <process ref="fileConverter"/>  
  13.             <to uri="file:d:/temp/outbox"/>  
  14.         </route>  
  15.     </camelContext>  
  16. </beans>  

与第五步一样启动Spring容器,Camel会每隔30秒轮询一下看d:/temp/inbox是否有文件,有的话则进行处理.