sparkstreaming+kafka0.10.0集成指南官方文档翻译
(2018-08-29 00:08:48)
标签:
sparkstreamingkafkakafka0.10kafka1.0spark |
Spark Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)
译文:Spark Streaming + Kafka集成指南(Kafka broker版本0.10.0或更高版本)
The Spark Streaming integration for Kafka 0.10 is similar in design
to the 0.8
译文:Kafka 0.10的Spark Streaming集成在设计上与0.8 Direct Stream方法类似。 它提供简单的并行性,Kafka分区和Spark分区之间的1:1对应关系,以及对偏移和元数据的访问。 但是,由于较新的集成使用新的Kafka使用者API而不是简单的API,因此使用方法存在显着差异。 此版本的集成标记为实验版,因此API可能会发生变化。
Linking
For Scala/Java applications using SBT/Maven project definitions,
link your streaming application with the following artifact
(see
译文:对于使用SBT / Maven项目定义的Scala / Java应用程序,请将流应用程序与以下工件链接(有关详细信息,请参阅主编程指南中的链接部分)。
groupId = org.apache.spark
artifactId = spark-streaming-kafka-0-10_2.11
version = 2.3.1
Do not
译文:不要在org.apache.kafka工件(例如kafka-clients)上手动添加依赖项。 spark-streaming-kafka-0-10工件已经具有适当的传递依赖性,并且不同版本可能在难以诊断的方式上不兼容。
Creating a Direct Stream
Note that the namespace for the import includes the version, org.apache.spark.streaming.kafka010
创建直接流
请注意,导入的命名空间包括版本org.apache.spark.streaming.kafka010
import
val
val
stream.map(record
Each item in the stream is a
For possible kafkaParams, see
译文:流中的每个项目都是ConsumerRecord
有关可能的kafkaParams,请参阅Kafka使用者配置文档。 如果Spark批处理持续时间大于默认的Kafka心跳会话超时(30秒),请适当增加heartbeat.interval.ms和session.timeout.ms。 对于大于5分钟的批次,这将需要在代理上更改group.max.session.timeout.ms。 请注意,该示例将enable.auto.commit设置为false,有关讨论,请参阅下面的存储偏移。
LocationStrategies
The new Kafka consumer API will pre-fetch messages into buffers. Therefore it is important for performance reasons that the Spark integration keep cached consumers on executors (rather than recreating them for each batch), and prefer to schedule partitions on the host locations that have the appropriate consumers.
In most cases, you should use
译文:新的Kafka使用者API将预先获取消息到缓冲区。 因此,出于性能原因,Spark集成将缓存消费者保留在执行程序上(而不是为每个批处理重新创建它们),并且更喜欢在具有适当使用者的主机位置上安排分区,这一点很重要。
在大多数情况下,您应该使用LocationStrategies.PreferConsistent,如上所示。 这将在可用执行程序之间均匀分配分区。 如果您的执行程序与Kafka代理在同一主机上,请使用PreferBrokers,它更愿意在该分区的Kafka领导者上安排分区。 最后,如果分区之间的负载有很大的偏差,请使用PreferFixed。 这允许您指定分区到主机的显式映射(任何未指定的分区将使用一致的位置)。
The cache for consumers has a default maximum size of 64. If you
expect to be handling more than (64 * number of executors) Kafka
partitions, you can change this setting
via
If you would like to disable the caching for Kafka consumers, you
can set
The cache is keyed by topicpartition and group.id, so use
a
译文:消费者的缓存的默认最大大小为64.如果您希望处理超过(64 *个执行程序数)Kafka分区,则可以通过spark.streaming.kafka.consumer.cache.maxCapacity更改此设置。
如果要禁用Kafka使用者的缓存,可以将spark.streaming.kafka.consumer.cache.enabled设置为false。 可能需要禁用缓存来解决SPARK-19185中描述的问题。 一旦SPARK-19185解决,可以在Spark的更高版本中删除此属性。
缓存由topicpartition和group.id键入,因此每次调用createDirectStream时都要使用单独的group.id。
ConsumerStrategies
The new Kafka consumer API has a number of different ways to
specify topics, some of which require considerable
post-object-instantiation setup.ConsumerStrategies
ConsumerStrategies.Subscribe,
as shown above, allows you to subscribe to a fixed collection of
topics.
If you have specific consumer setup needs that are not met by the
options above,
译文:ConsumerStrategies
新的Kafka使用者API有许多不同的方法来指定主题,其中一些需要相当大的后对象实例化设置。 ConsumerStrategies提供了一种抽象,即使从检查点重新启动后,Spark也可以获得正确配置的使用者。
ConsumerStrategies.Subscribe,如上所示,允许您订阅固定的主题集合。 SubscribePattern允许您使用正则表达式来指定感兴趣的主题。 请注意,与0.8集成不同,使用Subscribe或SubscribePattern应响应在正在运行的流期间添加分区。 最后,Assign允许您指定固定的分区集合。 所有这三种策略都有重载的构造函数,允许您指定特定分区的起始偏移量。
如果您具有上述选项无法满足的特定消费者设置需求,则ConsumerStrategy是您可以扩展的公共类。
Creating an RDD
If you have a use case that is better suited to batch processing, you can create an RDD for a defined range of offsets.
译文:如果您的用例更适合批处理,则可以为定义的偏移范围创建RDD。
// Import dependencies and create kafka params as in Create Direct Stream above
val
val
Note that you cannot use
译文:请注意,您不能使用PreferBrokers,因为没有流,没有驱动程序端使用者可以自动为您查找代理元数据。 如有必要,请使用PreferFixed和您自己的元数据查找。
Obtaining Offsets
·
·
stream.foreachRDD
Note that the typecast to
译文:请注意,只有在createDirectStream结果调用的第一个方法中完成对HasOffsetRanges的类型转换才会成功,而不是在方法链的后面。 请注意,在任何混洗或重新分区的方法之后,RDD分区和Kafka分区之间的一对一映射不会保留,例如: reduceByKey()或window()。
Storing Offsets
Kafka delivery semantics in the case of failure depend on how and
when offsets are stored. Spark output operations
are
译文:存储偏移量
失败时的Kafka传递语义取决于存储偏移的方式和时间。 Spark输出操作至少一次。 因此,如果您想要等同于完全一次的语义,则必须在幂等输出后存储偏移量,或者在原子事务中将偏移量与输出一起存储。 通过这种集成,您可以提供3个选项,以提高可靠性(和代码复杂性),以便存储偏移量。
Checkpoints
If you enable Spark
译文:检查点
如果启用Spark检查点,则偏移量将存储在检查点中。 这很容易实现,但也有缺点。 您的输出操作必须是幂等的,因为您将获得重复输出; 交易不是一种选择。 此外,如果应用程序代码已更改,则无法从检查点恢复。 对于计划的升级,您可以通过在旧代码的同时运行新代码来缓解这种情况(因为无论如何输出都必须是幂等的,它们不应该发生冲突)。 但是对于需要更改代码的计划外故障,除非您有其他方法来识别已知良好的起始偏移,否则您将丢失数据。
Kafka itself
Kafka has an offset commit API that stores offsets in a special
Kafka topic. By default, the new consumer will periodically
auto-commit offsets. This is almost certainly not what you want,
because messages successfully polled by the consumer may not yet
have resulted in a Spark output operation, resulting in undefined
semantics. This is why the stream example above sets
“enable.auto.commit” to false. However, you can commit offsets to
Kafka after you know your output has been stored, using
the
译文:Kafka有一个偏移提交API,用于在特殊的Kafka主题中存储偏移量。 默认情况下,新消费者将定期自动提交偏移量。 这几乎肯定不是您想要的,因为消费者成功轮询的消息可能尚未导致Spark输出操作,从而导致未定义的语义。 这就是上面的流示例将“enable.auto.commit”设置为false的原因。 但是,您可以使用commitAsync API在知道输出已存储后向Kafka提交偏移量。 与检查点相比的好处是,无论您的应用程序代码如何变化,Kafka都是一个耐用的商店。 但是,Kafka不是交易性的,因此您的输出必须仍然是幂等的。
stream.foreachRDD
As with HasOffsetRanges, the cast to CanCommitOffsets will only succeed if called on the result of createDirectStream, not after transformations. The commitAsync call is threadsafe, but must occur after outputs if you want meaningful semantics.
译文:与HasOffsetRanges一样,只有在createDirectStream的结果上调用而不是在转换后,才能成功转换为CanCommitOffsets。 commitAsync调用是线程安全的,但如果您需要有意义的语义,则必须在输出后发生。
Your own data store
For data stores that support transactions, saving offsets in the same transaction as the results can keep the two in sync, even in failure situations. If you’re careful about detecting repeated or skipped offset ranges, rolling back the transaction prevents duplicated or lost messages from affecting results. This gives the equivalent of exactly-once semantics. It is also possible to use this tactic even for outputs that result from aggregations, which are typically hard to make idempotent.
译文:您自己的数据存储
对于支持事务的数据存储,在结果的同一事务中保存偏移可以使两者保持同步,即使在故障情况下也是如此。 如果您在检测重复或跳过的偏移范围时非常小心,则回滚事务可防止重复或丢失的消息影响结果。 这给出了完全一次语义的等价物。 甚至对于由聚合产生的输出也可以使用这种策略,这通常很难使幂等。
// The details depend on your data store, but the general idea looks like this
// begin from the the
offsets committed to the databaseval
val
stream.foreachRDD