×

Samza 入门

Samza 你好Samza 下载Samza 功能预览

Samza 详细介绍

Samza 背景Samza 概念Samza 结构

Samza 与其他流处理项目比较

Samza 比较介绍Samza MUPD8Samza 与 StormSamza 与 Spark Streaming

Samza API

Samza API概述

Samza 核心

Samza ContainerSamza 流Samza 序列化Samza 定期检查Samza 状态管理Samza 窗口功能Samza 协调器流Samza 事件循环Samza 指标Samza JMX

Samza 作业部署

Samza JobRunnerSamza 配置Samza 打包Samza YARN工作Samza 记录Samza 再处理Samza Web UI和REST APISamza 分离框架和作业部署

Samza YARN

Samza Application MasterSamza YARN执行隔离Samza 主机关联和 YARNSamza YARN资源本地化Samza YARN安全Samza 写入HDFSSamza 从HDFS文件读取

Samza 相关操作

Samza 安全Samza Kafka自动创建主题

Samza REST服务

Samza REST服务概观Samza REST服务资源Samza REST服务监视器

Samza 附录

附录一 工作资源附录二 任务资源附录三 远程调试附录四 从HDFS部署Samza工作附录五 部署Samza Job到CDH附录六 在多节点YARN中运行附录七 在没有联网的情况下运行附录八 Samza REST入门附录九 Async API和多线程指南附录十 代码附录十一 Samza配置参考

Samza API概述


在为 Samza 编写流处理器时,必须实现 StreamTask 或 AsyncStreamTask 接口。您应该为同步进程实现 StreamTask,其中消息处理在过程方法返回后完成。StreamTask 的一个例子是不涉及远程调用的计算:

package com.example.samza;

public class MyTaskClass implements StreamTask {

  public void process(IncomingMessageEnvelope envelope,
                      MessageCollector collector,
                      TaskCoordinator coordinator) {
    // process message
  }
}

另一方面,AsyncSteamTask 接口支持异步处理,其中,在 processAsync 方法返回,所述消息处理可能不是完整的。Java NIO,ParSeq 和 Akka 等各种并行库可用于进行异步调用,并通过调用 TaskCallback 来标记完成。Samza 将继续处理下一个消息或根据回调状态关闭容器。AsyncStreamTask 的一个示例是进行远程调用但不阻塞调用完成的计算:

package com.example.samza;

public class MyAsyncTaskClass implements AsyncStreamTask {

  public void processAsync(IncomingMessageEnvelope envelope,
                           MessageCollector collector,
                           TaskCoordinator coordinator,
                           TaskCallback callback) {
    // process message with asynchronous calls
    // fire callback upon completion, e.g. invoking callback from asynchronous call completion thread
  }
}

当您运行您的工作时,Samza 将创建您的课程的几个实例(可能在多台机器上)。这些任务实例处理输入流中的消息。

在您的工作配置中,您可以告诉 Samza 要消费的流。一个不完整的示例可能如下所示(有关详细信息,请参阅配置文档):

# This is the class above, which Samza will instantiate when the job is run
task.class=com.example.samza.MyTaskClass

# Define a system called "kafka" (you can give it any name, and you can define
# multiple systems if you want to process messages from different sources)
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory

# The job consumes a topic called "PageViewEvent" from the "kafka" system
task.inputs=kafka.PageViewEvent

# Define a serializer/deserializer called "json" which parses JSON messages
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory

# Use the "json" serializer for messages in the "PageViewEvent" topic
systems.kafka.streams.PageViewEvent.samza.msg.serde=json

对于 Samza 从任务输入流接收到的每个消息,调用进程方法。该信封包含重要的三件事情:消息、秘钥以及该消息来自的流。

/** Every message that is delivered to a StreamTask is wrapped
 * in an IncomingMessageEnvelope, which contains metadata about
 * the origin of the message. */
public class IncomingMessageEnvelope {
  /** A deserialized message. */
  Object getMessage() { ... }

  /** A deserialized key. */
  Object getKey() { ... }

  /** The stream and partition that this message came from. */
  SystemStreamPartition getSystemStreamPartition() { ... }
}

键和值被声明为 Object,并且需要转换为正确的类型。如果不配置序列化器/解串器,它们通常是 Java 字节数组。解串器可以将这些字节转换为任何其他类型,例如上述 JSON 解串器将字节数组解析为 java.util.Map,java.util.List 和 String 对象。

该 getSystemStreamPartition() 方法返回一个 SystemStreamPartition 对象,该对象告诉您消息来自哪里。它由三部分组成:

  1. 系统:根据作业配置中定义的消息来自的系统的名称。您可以有多个输入和/或输出系统,每个系统具有不同的名称。
  2. 流名称:源系统中的流(话题,队列)的名称。这也在作业配置中定义。
  3. 分区:一个流通常被分成多个分区,每个分区由Samza分配给一个StreamTask实例。

API 如下所示:

/** A triple of system name, stream name and partition. */
public class SystemStreamPartition extends SystemStream {

  /** The name of the system which provides this stream. It is
      defined in the Samza job's configuration. */
  public String getSystem() { ... }

  /** The name of the stream/topic/queue within the system. */
  public String getStream() { ... }

  /** The partition within the stream. */
  public Partition getPartition() { ... }
}

在上面的示例作业配置中,系统名称为 “kafka”,流名称为 “PageViewEvent”。(名称 “kafka” 不是特别的 - 您可以给系统任何您想要的名称。)如果您有多个输入流加入到 StreamTask中,您可以使用 SystemStreamPartition 来确定您收到的消息类型。

如何发送消息?如果您在 StreamTask 中查看 process()方法,您将看到您收到一个 MessageCollector。

/** When a task wishes to send a message, it uses this interface. */
public interface MessageCollector {
  void send(OutgoingMessageEnvelope envelope);
}

要发送消息,您将创建一个 OutgoingMessageEnvelope 对象并将其传递给消息收集器。至少,信封指定要发送的消息,以及将其发送到的系统和流名称。或者,您可以指定分区键和其他参数。

注意:请仅使用方法中的 MessageCollector 对象 process()。如果您持有 MessageCollector 实例并稍后再次使用,您的邮件可能无法正确发送。

例如,这是一个简单的任务,将每个输入消息分成单词,并将每个单词作为单独的消息发出:

public class SplitStringIntoWords implements StreamTask {

  // Send outgoing messages to a stream called "words"
  // in the "kafka" system.
  private final SystemStream OUTPUT_STREAM =
    new SystemStream("kafka", "words");

  public void process(IncomingMessageEnvelope envelope,
                      MessageCollector collector,
                      TaskCoordinator coordinator) {
    String message = (String) envelope.getMessage();

    for (String word : message.split(" ")) {
      // Use the word as the key, and 1 as the value.
      // A second task can add the 1's to get the word count.
      collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, word, 1));
    }
  }
}

对于 AsyncStreamTask 示例,请遵循Samza Async API和多线程用户指南中的教程。有关 API 的更多详细信息,请参阅配置


分类导航

关注微信下载离线手册

bootwiki移动版 bootwiki
(群号:472910771)