×

Samza 入门

Samza 你好Samza 下载Samza 功能预览

Samza 详细介绍

Samza 背景Samza 概念Samza 结构

Samza 与其他流处理项目比较

Samza 比较介绍Samza MUPD8Samza 与 StormSamza 与 Spark Streaming

Samza API

Samza API概述

Samza 核心

Samza ContainerSamza 流Samza 序列化Samza 定期检查Samza 状态管理Samza 窗口功能Samza 协调器流Samza 事件循环Samza 指标Samza JMX

Samza 作业部署

Samza JobRunnerSamza 配置Samza 打包Samza YARN工作Samza 记录Samza 再处理Samza Web UI和REST APISamza 分离框架和作业部署

Samza YARN

Samza Application MasterSamza YARN执行隔离Samza 主机关联和 YARNSamza YARN资源本地化Samza YARN安全Samza 写入HDFSSamza 从HDFS文件读取

Samza 相关操作

Samza 安全Samza Kafka自动创建主题

Samza REST服务

Samza REST服务概观Samza REST服务资源Samza REST服务监视器

Samza 附录

附录一 工作资源附录二 任务资源附录三 远程调试附录四 从HDFS部署Samza工作附录五 部署Samza Job到CDH附录六 在多节点YARN中运行附录七 在没有联网的情况下运行附录八 Samza REST入门附录九 Async API和多线程指南附录十 代码附录十一 Samza配置参考

Samza 序列化


从数据持久状态存储器读取或写入的每个消息都需要最终序列化为字节(通过网络发送或写入磁盘)。有不同的地方可以发生序列化和反序列化:

  1. 在客户端库中:例如,发布到 Kafka 并从 Kafka 消费的库支持可插拔序列化。
  2. 在任务实现中:您的进程方法可以使用原始字节数组作为输入和输出,并进行任何解析和序列化本身。
  3. 两者之间:Samza 提供串行解串器,或构成的层 SERDES 的简称。

你可以使用任何有意义的工作;Samza 不会对您施加任何特定的数据模型或序列化方案。然而,最干净的解决方案通常是使用 Samza 的 serde 层。以下配置示例显示如何使用它。

# Define a system called "kafka"
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory

# The job is going to consume a topic called "PageViewEvent" from the "kafka" system
task.inputs=kafka.PageViewEvent

# Define a serde called "json" which parses/serializes JSON objects
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory

# Define a serde called "integer" which encodes an integer as 4 binary bytes (big-endian)
serializers.registry.integer.class=org.apache.samza.serializers.IntegerSerdeFactory

# For messages in the "PageViewEvent" topic, the key (the ID of the user viewing the page)
# is encoded as a binary integer, and the message is encoded as JSON.
systems.kafka.streams.PageViewEvent.samza.key.serde=integer
systems.kafka.streams.PageViewEvent.samza.msg.serde=json

# Define a key-value store which stores the most recent page view for each user ID.
# Again, the key is an integer user ID, and the value is JSON.
stores.LastPageViewPerUser.factory=org.apache.samza.storage.kv.KeyValueStorageEngineFactory
stores.LastPageViewPerUser.changelog=kafka.last-page-view-per-user
stores.LastPageViewPerUser.key.serde=integer
stores.LastPageViewPerUser.msg.serde=json

每个 serde 都用工厂类定义。Samza 带有几个内置的 serdes,用于UTF-8字符串,二进制编码整数,JSON 等。以下是 Samza 支持的 serdes 的完整列表。

Serde 名字 数据处理
UTF-8字符串
整数 二进制编码整数
序列化 可序列化对象类型
长数据类型
JSON JSON格式的数据
字节 纯字节(无效) - 适用于二进制消息
字节缓冲区 字节缓冲区

您还可以通过实现SerdeFactory界面来创建自己的串行器。

你给一个 serde 的名字(例如上面的例子中的 “json” 和 “integer”)只是为了方便你的作业配置; 你可以选择任何你喜欢的名字。对于每个流和每个状态存储,您可以使用 serde 名称声明消息应如何序列化和反序列化。

如果您不声明 serde,Samza 只需将对象传递到任务实例和系统流之间。在这种情况下,您的任务需要发送和接收底层客户端库使用的任何类型的对象。

用于发送和接收消息的所有 Samza API 都键入 Object。这意味着您必须将消息转换为正确的类型才能使用它们。这是一个更多的代码,但它的优势是 Samza 不限于任何特定的数据模型。


分类导航

关注微信下载离线手册

bootwiki移动版 bootwiki
(群号:472910771)