flink手动维护kafka的offset
发布日期:2021-07-17 15:49:20
浏览次数:2
分类:技术文章
本文共 7308 字,大约阅读时间需要 24 分钟。
原创,允许转载,我的目的就是给大家节省时间
先说一下为什么手动维护offset,因为环境问题,目前读的是kafka0.8版本,推动升级比较吃力
手动维护offset的好处,你可以记录每个时间点的offset,如果上游日志异常,你可以把你记录的offset和时间戳拿出来,找出对应时间点的offset,去修复历史数据
不废话,写过spark的,看了我的代码就知道如何实现了,这个是
FlinkKafkaConsumer08是链接flink的类,咱们重点看
KeyedDeserializationSchema,这个是获取kafka的partitions喝offset的,实现了他就可以了,直接贴代码了兄弟们,不想码字了
public class FlinkKafkaConsumer08extends FlinkKafkaConsumerBase { private static final long serialVersionUID = -6272159445203409112L; public static final String GET_PARTITIONS_RETRIES_KEY = "flink.get-partitions.retry"; public static final int DEFAULT_GET_PARTITIONS_RETRIES = 3; private final Properties kafkaProperties; public FlinkKafkaConsumer08(String topic, DeserializationSchema valueDeserializer, Properties props) { this(Collections.singletonList(topic), valueDeserializer, props); } public FlinkKafkaConsumer08(String topic, KeyedDeserializationSchema deserializer, Properties props) { this(Collections.singletonList(topic), deserializer, props); } public FlinkKafkaConsumer08(List topics, DeserializationSchema deserializer, Properties props) { this((List)topics, (KeyedDeserializationSchema)(new KeyedDeserializationSchemaWrapper(deserializer)), props); } public FlinkKafkaConsumer08(List topics, KeyedDeserializationSchema deserializer, Properties props) { this(topics, (Pattern)null, deserializer, props); } @PublicEvolving public FlinkKafkaConsumer08(Pattern subscriptionPattern, DeserializationSchema valueDeserializer, Properties props) { this((Pattern)subscriptionPattern, (KeyedDeserializationSchema)(new KeyedDeserializationSchemaWrapper(valueDeserializer)), props); } @PublicEvolving public FlinkKafkaConsumer08(Pattern subscriptionPattern, KeyedDeserializationSchema deserializer, Properties props) { this((List)null, subscriptionPattern, deserializer, props); } private FlinkKafkaConsumer08(List topics, Pattern subscriptionPattern, KeyedDeserializationSchema deserializer, Properties props) { super(topics, subscriptionPattern, deserializer, PropertiesUtil.getLong((Properties)Preconditions.checkNotNull(props, "props"), "flink.partition-discovery.interval-millis", -9223372036854775808L), !PropertiesUtil.getBoolean(props, "flink.disable-metrics", false)); this.kafkaProperties = props; validateZooKeeperConfig(props); validateAutoOffsetResetValue(props); } protected AbstractFetcher createFetcher(SourceContext sourceContext, Map assignedPartitionsWithInitialOffsets, SerializedValue > watermarksPeriodic, SerializedValue > watermarksPunctuated, StreamingRuntimeContext runtimeContext, OffsetCommitMode offsetCommitMode, MetricGroup consumerMetricGroup, boolean useMetrics) throws Exception { long autoCommitInterval = offsetCommitMode == OffsetCommitMode.KAFKA_PERIODIC ? PropertiesUtil.getLong(this.kafkaProperties, "auto.commit.interval.ms", 60000L) : -1L; return new Kafka08Fetcher(sourceContext, assignedPartitionsWithInitialOffsets, watermarksPeriodic, watermarksPunctuated, runtimeContext, this.deserializer, this.kafkaProperties, autoCommitInterval, consumerMetricGroup, useMetrics); } protected AbstractPartitionDiscoverer createPartitionDiscoverer(KafkaTopicsDescriptor topicsDescriptor, int indexOfThisSubtask, int numParallelSubtasks) { return new Kafka08PartitionDiscoverer(topicsDescriptor, indexOfThisSubtask, numParallelSubtasks, this.kafkaProperties); } protected boolean getIsAutoCommitEnabled() { return PropertiesUtil.getBoolean(this.kafkaProperties, "auto.commit.enable", true) && PropertiesUtil.getLong(this.kafkaProperties, "auto.commit.interval.ms", 60000L) > 0L; } protected Map fetchOffsetsWithTimestamp(Collection partitions, long timestamp) { throw new UnsupportedOperationException("Fetching partition offsets using timestamps is only supported in Kafka versions 0.10 and above."); } protected static void validateZooKeeperConfig(Properties props) { if (props.getProperty("zookeeper.connect") == null) { throw new IllegalArgumentException("Required property 'zookeeper.connect' has not been set in the properties"); } else if (props.getProperty("group.id") == null) { throw new IllegalArgumentException("Required property 'group.id' has not been set in the properties"); } else { try { Integer.parseInt(props.getProperty("zookeeper.session.timeout.ms", "0")); } catch (NumberFormatException var3) { throw new IllegalArgumentException("Property 'zookeeper.session.timeout.ms' is not a valid integer"); } try { Integer.parseInt(props.getProperty("zookeeper.connection.timeout.ms", "0")); } catch (NumberFormatException var2) { throw new IllegalArgumentException("Property 'zookeeper.connection.timeout.ms' is not a valid integer"); } } } private static void validateAutoOffsetResetValue(Properties config) { String val = config.getProperty("auto.offset.reset", "largest"); if (!val.equals("largest") && !val.equals("latest") && !val.equals("earliest") && !val.equals("smallest")) { throw new IllegalArgumentException("Cannot use 'auto.offset.reset' value '" + val + "'. Possible values: 'latest', 'largest', 'earliest', or 'smallest'."); } }}
下面开始直接上代码,这样就可以了:
import java.text.SimpleDateFormatimport java.utilimport com.alibaba.fastjson.{JSON, JSONObject}import java.util.{Date, Properties}import java.util.Dateimport org.apache.flink.api.common.functions.RuntimeContextimport org.apache.flink.api.common.serialization.SimpleStringSchemaimport org.apache.flink.api.common.state.StateTtlConfig.TimeCharacteristicimport org.apache.flink.api.common.typeinfo.TypeInformationimport org.apache.flink.runtime.state.FunctionSnapshotContextimport org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaimport org.apache.flink.table.api.{TableEnvironment, Types}import org.apache.flink.table.api.scala._import org.apache.flink.table.functions.{ScalarFunction, TableFunction}import org.apache.flink.types.Rowobject FlinkDemo { case class KafkaMsg(key: String, value: String, topic: String, partiton: Int, offset: Long) class TypedKeyedDeserializationSchema extends KeyedDeserializationSchema[KafkaMsg] { def deserialize(key: Array[Byte], value: Array[Byte], topic: String, partition: Int, offset: Long ): KafkaMsg = KafkaMsg(new String(key), new String(value), topic, partition, offset ) def isEndOfStream(e: KafkaMsg): Boolean = false def getProducedType(): TypeInformation[KafkaMsg] = createTypeInformation } def main(args: Array[String]): Unit = { //System.setProperties() val env = StreamExecutionEnvironment.getExecutionEnvironment val properties = new Properties() //kafka位置 老版本的 kafka是配置zookeeper地址 properties.setProperty("bootstrap.servers","localhost:9092") properties.setProperty("zookeeper.connect","localhost:2181") val topic = "click" properties.setProperty("group.id", "test-flink") val consumer = new FlinkKafkaConsumer08(topic,new TypedKeyedDeserializationSchema(),properties) val text = env.addSource(consumer).print() env.execute() }}
记得关注我的微信公众号,后边有实战的东西和源码都会在上面更新,减少你们走弯路的时间
我微信二维码可以加一下也行
转载地址:https://blog.csdn.net/huzechen/article/details/98101005 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!
发表评论
最新留言
留言是一种美德,欢迎回访!
[***.207.175.100]2024年04月19日 12时40分11秒
关于作者
喝酒易醉,品茶养心,人生如梦,品茶悟道,何以解忧?唯有杜康!
-- 愿君每日到此一游!
推荐文章
Nginx proxy_cache 使用示例
2019-04-27
Nginx源代码分析 - 日志处理
2019-04-27
使Apache实现gzip压缩
2019-04-27
Memcached在大型网站中应用
2019-04-27
Hadoop简要介绍
2019-04-27
squid中的X-Cache和X-Cache-Lookup的意义
2019-04-27
squid 优化指南
2019-04-27
编程方式刷新Squid缓存服务器的五种方法
2019-04-27
centos vnc配置笔记
2019-04-27
Linux服务器网络开发模型
2019-04-27
nginx虚拟目录设置 alias 和 root
2019-04-27
理解http响应头中的Date和Age
2019-04-27
四层和七层负载均衡的区别
2019-04-27
设置Squid Cache_mem大小
2019-04-27
squid日志文件太大,怎样处理?
2019-04-27
让Squid 显示本地时间
2019-04-27
linux mysql 命令 大全
2019-04-27
清除Squid缓存的小工具
2019-04-27
Varnish Cache 3.0.0安装
2019-04-27