当前位置: 首页 > news >正文

kafka复习:(3)自定义序列化器和反序列化器

一、实体类定义:


public class Company {private String name;private String address;public String getName() {return name;}public void setName(String name) {this.name = name;}public String getAddress() {return address;}public void setAddress(String address) {this.address = address;}@Overridepublic String toString() {return "Company{" +"name='" + name + '\'' +", address='" + address + '\'' +'}';}public Company(String name, String address) {this.name = name;this.address = address;}public Company() {}
}

二、自定义序列化器和反序列化器


import org.apache.kafka.common.serialization.Serializer;import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;public class CompanySerializer implements Serializer<Company> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}//进行字节数组序列化@Overridepublic byte[] serialize(String topic, Company data) {if(data == null){return null;}byte[] name, address;try{if(data.getName() != null){name = data.getName().getBytes("UTF-8");}else {name = new byte[0];}if(data.getAddress() != null){address = data.getAddress().getBytes("UTF-8");}else{address = new byte[0];}ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4+ name.length + address.length);byteBuffer.putInt(name.length);byteBuffer.put(name);byteBuffer.putInt(address.length);byteBuffer.put(address);return byteBuffer.array();}catch (UnsupportedEncodingException e){e.printStackTrace();}return new byte[0];}@Overridepublic void close() {}
}

import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Map;public class CompanyDeserializer implements Deserializer<Company> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}@Overridepublic Company deserialize(String topic, byte[] data) {if (data == null) {return null;}ByteBuffer buffer = ByteBuffer.wrap(data);int nameLen, addressLen;String name, address;nameLen = buffer.getInt();byte[] nameBytes = new byte[nameLen];buffer.get(nameBytes);addressLen = buffer.getInt();byte[] addressBytes = new byte[addressLen];buffer.get(addressBytes);try {name = new String(nameBytes, "UTF-8");address = new String(addressBytes, "UTF-8");} catch (UnsupportedEncodingException ex) {throw new SerializationException("Error:"+ex.getMessage());}return new Company(name,address);}@Overridepublic void close() {}
}

三、定义生产者和消费者

package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;public class CompanyProducer {public static void main(String[] args) throws Exception{Properties properties = new Properties();properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//设置value的序列化器properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CompanySerializer.class.getName());properties.put("bootstrap.servers", "xxx.xxx.xxx.xxx:9092");KafkaProducer<String, Company> producer = new KafkaProducer<>(properties);Company company = new Company();company.setAddress("Beijing");company.setName("Connection");ProducerRecord<String, Company> record = new ProducerRecord<>("companyTopic", company);producer.send(record).get();}
}
package com.cisdi.dsp.modules.metaAnalysis.rest;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.Collections;
import java.util.Properties;public class CompanyConsumer {public static void main(String[] args) {Properties properties=new Properties();properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, CompanyDeserializer.class.getName());properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"xxx.xxx.xxx.xxx:9092");properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"my");KafkaConsumer<String,Company> kafkaConsumer=new KafkaConsumer<>(properties);kafkaConsumer.subscribe(Collections.singletonList("companyTopic"));while(true){ConsumerRecords<String,Company> consumerRecords=kafkaConsumer.poll(Duration.ofMillis(1000));for(ConsumerRecord<String,Company> consumerRecord: consumerRecords){System.out.println(consumerRecord.value());}}}
}
http://www.lryc.cn/news/136759.html

相关文章:

  • Unity 图片资源的适配
  • 【Axure高保真原型】通过输入框动态控制折线图
  • 【Java】树结构数据的搜索
  • ElementUI中的日历组件加载无效的问题
  • Git版本管理(03)stash临时操作和.gitignore配置
  • 【ThingJS | 3D可视化】开发框架,一站式数字孪生
  • SpringBoot返回响应排除为 null 的字段
  • 华为数通方向HCIP-DataCom H12-821题库(单选题:41-60)
  • OpenAI推出GPT-3.5Turbo微调功能并更新API;Midjourney更新局部绘制功能
  • 相机成像之3A算法的综述
  • 最新AI系统ChatGPT程序源码/微信公众号/H5端+搭建部署教程+完整知识库
  • OpenCV实例(九)基于深度学习的运动目标检测(二)YOLOv2概述
  • 【Docker】已经创建好的Docker怎么设置开机自启
  • E - Excellent Views
  • WiFi天线和NB-IoT天线不通用
  • IoT DC3 是一个基于 Spring Cloud 的开源的、分布式的物联网(IoT)平台本地部署步骤
  • VBA Excel自定义函数的使用 简单的语法
  • 字节跳动 从需求到上线全流程 软件工程流程 需求评估 MVP
  • 线性代数-矩阵的本质
  • React基础入门之虚拟Dom
  • C++基础Ⅰ编译、链接
  • VMware和ubuntu配置Hadoop环境
  • uview2.0自定义tabbar
  • Star History 月度开源精选|Llama 2 及周边生态特辑
  • 修改电脑上路由表使笔记本默认走无线
  • Spring Cache的介绍以及怎么使用(redis)
  • 软考高级系统架构设计师系列论文六十九:论信息系统的安全风险评估
  • Ubuntu系统安装之后首需要做的事情
  • Qt——QPushButton控件的常见属性、方法和信号
  • AUTOSAR规范与ECU软件开发(实践篇)5.5 基于ISOLAR-A的系统级设计与配置方法(上)