×

Samza 入门

Samza 你好Samza 下载Samza 功能预览

Samza 详细介绍

Samza 背景Samza 概念Samza 结构

Samza 与其他流处理项目比较

Samza 比较介绍Samza MUPD8Samza 与 StormSamza 与 Spark Streaming

Samza API

Samza API概述

Samza 核心

Samza ContainerSamza 流Samza 序列化Samza 定期检查Samza 状态管理Samza 窗口功能Samza 协调器流Samza 事件循环Samza 指标Samza JMX

Samza 作业部署

Samza JobRunnerSamza 配置Samza 打包Samza YARN工作Samza 记录Samza 再处理Samza Web UI和REST APISamza 分离框架和作业部署

Samza YARN

Samza Application MasterSamza YARN执行隔离Samza 主机关联和 YARNSamza YARN资源本地化Samza YARN安全Samza 写入HDFSSamza 从HDFS文件读取

Samza 相关操作

Samza 安全Samza Kafka自动创建主题

Samza REST服务

Samza REST服务概观Samza REST服务资源Samza REST服务监视器

Samza 附录

附录一 工作资源附录二 任务资源附录三 远程调试附录四 从HDFS部署Samza工作附录五 部署Samza Job到CDH附录六 在多节点YARN中运行附录七 在没有联网的情况下运行附录八 Samza REST入门附录九 Async API和多线程指南附录十 代码附录十一 Samza配置参考

Samza Async API和多线程用户指南


本教程提供了使用 Samza 异步 API 和多线程的示例和指南。

多线程同步过程

如果您的工作过程涉及同步 IO 或阻塞 IO,则可以简单地配置 Samza 内置线程池来并行运行任务。在以下示例中,SyncRestTask 使用 Jersey 客户端在每个进程()中进行休息调用。

public class SyncRestTask implements StreamTask, InitableTask, ClosableTask {
  private Client client;
  private WebTarget target;

  @Override
  public void init(Config config, TaskContext taskContext) throws Exception {
    client = ClientBuilder.newClient();
    target = client.target("http://example.com/resource/").path("hello");
  }

  @Override
  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
    Response response = target.request().get();
    System.out.println("Response status code " + response.getStatus() + " received.");
  }

  @Override
  public void close() throws Exception {
    client.close();
  }
}

默认情况下,Samza 将在单个线程中顺序运行此任务。在下面我们配置大小为16的线程池并行运行任务:

# Thread pool to run synchronous tasks in parallel.
job.container.thread.pool.size=16

注意:线程池将用于运行任务的所有同步操作,包括 StreamTask.process(),WindowableTask.window()和内部的 Task.commit()。这是为了最大化任务之间的并行性以及减少阻塞时间。在多线程中运行任务时,默认情况下,Samza 仍保证任务内的消息的按顺序处理。

AsyncStreamTask API 的异步过程

如果您的工作过程是异步的,例如,进行非阻塞的远程 IO 调用,AsyncStreamTask 接口将为其提供支持。在下面的例子中,AsyncRestTask 会使异步休息调用,并在完成后触发回调。

public class AsyncRestTask implements AsyncStreamTask, InitableTask, ClosableTask {
  private Client client;
  private WebTarget target;

  @Override
  public void init(Config config, TaskContext taskContext) throws Exception {
    client = ClientBuilder.newClient();
    target = client.target("http://example.com/resource/").path("hello");
  }

  @Override
  public void processAsync(IncomingMessageEnvelope envelope, MessageCollector collector,
      TaskCoordinator coordinator, final TaskCallback callback) {
    target.request().async().get(new InvocationCallback() {
      @Override
      public void completed(Response response) {
        System.out.println("Response status code " + response.getStatus() + " received.");
        callback.complete();
      }

      @Override
      public void failed(Throwable throwable) {
        System.out.println("Invocation failed.");
        callback.failure(throwable);
      }
    });
  }

  @Override
  public void close() throws Exception {
    client.close();
  }
}

在上面的例子中,processAsync()返回时,进程不完整。在来自泽西客户端的回调线程中,我们触发 TaskCallback 以指示进程完成。为了确保在一定时间间隔(例如5秒)内触发回调,您可以配置以下属性:

# Timeout for processAsync() callback. When the timeout happens, it will throw a TaskCallbackTimeoutException and shut down the container.
task.callback.timeout.ms=5000

注意:默认情况下,Samza 还保证 AsyncStreamTask 中消息的按顺序进程,这意味着在触发前一个 processAsync()回调之前,任务的下一个 processAsync()才会被调用。

无序过程

在上述两种情况下,Samza 默认支持按顺序进行。通过允许任务并行处理多个未完成的消息也支持进一步的并行性。以下配置允许一个任务一次处理最多4个未完成的消息:

# Max number of outstanding messages being processed per task at a time, applicable to both StreamTask and AsyncStreamTask.
task.max.concurrency=4

注意:在 AsyncStreamTask 的情况下,processAsync()仍然按消息到达的顺序调用,但完成可能会出错。在具有多线程的 StreamTask 的情况下,process()可以无序运行,因为它们被分派到线程池。应此选项不是在需要输出的严格的顺序使用。

保证语义

在任何情况下,Samza 保证以下语义:

  • Samza 是安全的。您可以安全地访问任务线程中的键值存储,写入消息和检查点偏移量的作业状态。如果您在任务之间共享其他数据,例如全局变量或静态数据,则如果可以通过多个线程并发访问数据,例如 StreamTask 在配置的线程池中运行多个线程,则不会线程安全。对于任务中的状态(如成员变量),Samza 保证进程,窗口和提交的相互排他性,因此这些操作之间不会有并发的修改,任何来自一个操作的状态变化都将完全可见。
  • 当没有未完成的进程 / processAsync 并且没有新的进程/进程同步调用时,WindowableTask.window 被调用,直到完成为止。Samza 引擎负责确保及时调用该窗口。
  • 检查点保证仅覆盖完全处理的事件。它保存在 commit()方法中。

分类导航

关注微信下载离线手册

bootwiki移动版 bootwiki
(群号:472910771)