kafkaStream解析json出错导致程序中断的解决方法

时间:2021-10-24 01:02:18

出错在 KStreamFlatMapValues 方法执行时,由于json异常数据无法解析,结果生成的值为null,报错信息如下:

2018-04-18 19:21:04,776 ERROR [app-8629d547-bcf1-487b-85e5-07d7e135e1e3-StreamThread-1] com.gw.stream.KStream103.lambda$main$1(100) | 捕获到异常:hello world hello world king
Exception in thread "app-8629d547-bcf1-487b-85e5-07d7e135e1e3-StreamThread-1" java.lang.NullPointerException
at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:41)
at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.forward(AbstractProcessorContext.java:174)
at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:224)
at org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:94)
at org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:411)
at org.apache.kafka.streams.processor.internals.StreamThread.processAndMaybeCommit(StreamThread.java:918)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:798)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)

问题解决方案:

  1. 对json解析的bean添加未知字段忽略


    import java.util.List;
    import com.fasterxml.jackson.annotation.JsonIgnoreProperties; @JsonIgnoreProperties(ignoreUnknown = true)
    public class Bean103 { private List<String> key1;
    private List<List<String>> key2; public void setKey1(List<String> key1) {
    this.key1 = key1;
    }
    public List<String> getKey1() {
    return key1;
    } public void setKey2(List<List<String>> key2) {
    this.key2 = key2;
    }
    public List<List<String>> getKey2() {
    return key2;
    }
    }
  2. 由于报空指针错误,所以解决空指针问题,即判断为null时创建一个空对象.

    return list == null ? new ArrayList<String>():list;
  3. 完整的示例代码如下:

    package com.gw.stream;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.KeyValue;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.Produced;
    import org.apache.log4j.Logger; import com.alibaba.fastjson.JSONObject; public class KStream103 { private static Logger log = Logger.getLogger(KStream103.class); public static void main(String[] args) { if(args.length < 6){
    log.error("错误:参数个数不正确[application_id bootstarp_server groupid source_topic target_topic auto_offset_reset]");
    return ;
    }
    String application_id=args[0];
    String bootstarp_server = args[1];
    String groupid = args[2];
    String source_topic = args[3];
    String target_topic = args[4];
    String auto_offset_reset = args[5]; Properties props = new Properties();
    // consumer group
    // 指定一个应用ID,会在指定的目录下创建文件夹,里面存放.lock文件
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, application_id);
    props.put(StreamsConfig.STATE_DIR_CONFIG, "./tmp/");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,bootstarp_server);
    // props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG,10485760);
    props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2000);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, auto_offset_reset);
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); //自动提交
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
    //针对时间异常解决方法
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MyEventTimeExtractor.class); final String splitChar = "\001"; StreamsBuilder builder = new StreamsBuilder();
    KStream<String, String> textLines = builder.stream(source_topic); // 接收第一个topic
    textLines.flatMapValues(value -> { Bean103 bean103 = null;
    List<String> list = null; try { //这里是value的业务处理逻辑...最终返回的是一个list } catch (Exception e) {
    log.error("捕获到异常:" + value);
    log.error("error message:" + e.getMessage()); }
    return list == null ? new ArrayList<String>():list; }).filter((k,v)-> v !=null).map((k, v) -> new KeyValue<>(k, v))
    .to(target_topic, Produced.with(Serdes.String(), Serdes.String())); KafkaStreams streams = new KafkaStreams(builder.build(), props);
    streams.start(); } }

kafkaStream解析json出错导致程序中断的解决方法的更多相关文章

  1. PHP json&lowbar;decode 函数解析 json 结果为 NULL 的解决方法

    在做网站 CMS 模块时,对于模块内容 content 字段,保存的是 json 格式的字符串,所以在后台进行模块内容的编辑操作 ( 取出保存的数据 ) 时,需要用到 json_decode() 函数 ...

  2. Android Studio 解析json文件出现中文乱码解决方法

    作为一个Android开发初学者,好不容易找到解决方法,跟大家分享一下, 其实很简单,只要保持服务器上的文件(date2.json)与软件的编码方式一样就行. 我用的Android Studio是ut ...

  3. &period;NET MVC Json&lpar;&rpar;处理大数据异常解决方法

    [1-部分原文]: .NET MVC Json()处理大数据异常解决方法 整个项目采用微软的ASP.NET MVC3进行开发,前端显示采用EasyUI框架,图表的显示用的是Highcharts,主要进 ...

  4. XCode编译文件过多导致内存吃紧解决方法

    XCode编译文件过多导致内存吃紧解决方法 /Users/~~/Library/Developer/Xcode/DerivedData 1) 然后 找到编译文件 删除 就好了哦 快去试试看吧

  5. Jquery方法load之后导致js失效解决方法

    Jquery方法load之后导致js失效解决方法 >>>>>>>>>>>>>>>>>>&gt ...

  6. yum安装命令:遇到的问题报错如下: File &quot&semi;&sol;usr&sol;bin&sol;yum&quot&semi;&comma; line 30 except KeyboardInterrupt&comma; e&colon; 通过看报错可以了解到是使用了python2的语法,所以了解到当前yum使用的Python2,因为我单独安装了python3,且python3设置为默认版本了,所以导致语法问题 解决方法: 使用python2&period;6 yum install

    1.安装zip yum install -y unzip zip 2.安装lrszs yum -y install lrzsz 3.安装scp 遇到下面的问题: 结果提示: No package sc ...

  7. 阿里云提出的漏洞(Phpcms V9某处逻辑问题导致getshell漏洞解决方法)的问题

    最近从阿里云云盾检测流出来的,相比使用阿里云服务器的朋友已经收到漏洞提醒:Phpcms V9某处逻辑问题导致getshell漏洞解决方法,这个漏洞怎么办呢?CMSYOU在这里找到针对性解决办法分享给大 ...

  8. C&num; Winform频繁刷新导致界面闪烁解决方法

    C#Winform频繁刷新导致界面闪烁解决方法 一.通过对窗体和控件使用双缓冲来减少图形闪烁(当绘制图片时出现闪烁时,使用双缓冲) 对于大多数应用程序,.NET Framework 提供的默认双缓冲将 ...

  9. winform频繁刷新导致界面闪烁解决方法

    转自龙心文 原文 winform频繁刷新导致界面闪烁解决方法 一.通过对窗体和控件使用双缓冲来减少图形闪烁(当绘制图片时出现闪烁时,使用双缓冲) 对于大多数应用程序,.NET Framework 提供 ...

随机推荐

  1. 用shebang编写一个ssh自动登陆脚本

    单例模式是软件开发中非常普遍的一种模式.它的主要作用是确保系统中,始终只存在一个类的实例对象. 这样做的好处有两点: 1.对于需要频繁使用的对象,在每次使用时,如果都需要重新创建,并且这些对象的内容都 ...

  2. ViewPager,实现真正的无限循环(定时&plus;手动)

    利用定时器,实现循环轮播,很简单:只需在定时器的消息里加如下代码即可: int count = adapter.getCount(); if (count > 1) { // 多于1个,才循环 ...

  3. 智能指针 ADO数据库连接

    ADO库包含三个基本接口:_ConnectionPtr接口._CommandPtr接口和_RecordsetPtr接口._ConnectionPtr接口返回一个记录集或一个空指针.通常使用它来创建一个 ...

  4. Candence下对&OpenCurlyDoubleQuote;跨页连接器(off-page connector)”进行批量重命名的方法

    parts.ports.alias等等均可以在“属性编辑器(Property Editor)”中进行查看编辑,并通过复制到Excel等表格软件来进行批量修改.之后再粘贴回去的方法进行批量编辑.但是“跨 ...

  5. MongoDB 安装和配置

    [前言] Mongodb是一款nosql数据库,关于nosql 以及 mongodb本文不进行介绍,在数据库的选型方面,本人说是在机缘巧合之下选择了mongodb,并且拟使用mongodb搭建日志系统 ...

  6. PHP的ntohl网络字节序函数及相关知识

    PHP与C服务器的socket通信,在做数据转换的时候,PHP没有提供对应将网络字节序和机器字节序相互转换的程序,但是根据函数的意义,我们可以做相应的转换来实现这一函数: function ntohl ...

  7. C语言之统计输入字符数量

    这个程序市委了统计所输入的数字或者英文字母的数字的数量,当然稍加改动便可以统计特殊字符的个数,在此不再冗叙. 代码如下: #include <iostream> using namespa ...

  8. MT【257】任意存在并存

    函数$f(x)=\dfrac{4x}{x+1}(x>0),g(x)=\dfrac{1}{2}(|x-a|-|x-b|),(a<b)$, 若对任意$x_1>0$,存在$x_2\le x ...

  9. elasticsearch5安装

    环境: centos7 es 5.4.3 es安装 一.下载 wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsear ...

  10. Hibernate 查询技术

    转载: http://blog.csdn.net/u014078192/article/details/24986475 一.Hibernate的三种查询方式(掌握) Hibernate中提供了三种查 ...