当前位置: 首页 > news >正文

flink如何基于Pekko实现RPC调用

摘要

通过阅读flink源码,了解flink是如何基于Pekko实现远程RPC调用的

Pekko实现远程调用

Flink 的 RPC 框架底层是构建在 Pekko 的 actor 模型之上的,了解Pekko如何使用,对后续源码的阅读有帮助。

Apache Pekko(原为 Akka 的一个分支)是一个强大的工具包,用于构建并发、分布式和可扩展的系统。它基于经典的 Actor 模型,提供了一种事件驱动、非阻塞的编程范式,使开发者能够更轻松地构建容错性强、模块化清晰的分布式应用。

引入依赖

确保你使用的是 Apache Pekko 的 Maven 依赖:

<dependencies><dependency><groupId>org.apache.pekko</groupId><artifactId>pekko-actor_2.13</artifactId><version>1.0.2</version></dependency><dependency><groupId>org.apache.pekko</groupId><artifactId>pekko-remote_2.13</artifactId><version>1.0.2</version></dependency>
</dependencies>

定义消息类(RPC 通信协议)

public class HelloRequest implements java.io.Serializable {  public final String message;  public HelloRequest(String message) {  this.message = message;  }  
}
public class HelloResponse implements java.io.Serializable {  public final String reply;  public HelloResponse(String reply) {  this.reply = reply;  }  @Override  public String toString() {  return reply;  }  
}

HelloRequestHelloResponse 是在使用 Pekko远程通信 时的消息协议类,也就是你定义的“请求消息”和“响应消息”。它们是通过网络在客户端与服务端之间传输的,所以必须满足可序列化(Serializable)的要求。

服务端代码(远程服务)

HelloActor.java

public class HelloActor extends AbstractActor {  @Override  public Receive createReceive() {  return receiveBuilder()  .match(HelloRequest.class, req -> {  System.out.println("服务端收到消息: " + req.message);  // 回复客户端  getSender().tell(new HelloResponse("你好,客户端,我收到了:" + req.message), getSelf());  })  .build();  }  public static Props props() {  return Props.create(HelloActor.class);  }  
}

在 Pekko 中,HelloActor 相当于传统 RPC 框架中的服务实现类,但其处理逻辑是基于 消息驱动模型 而非方法调用。Pekko 的核心设计理念是:Actor 只对接收到的消息做出反应,并保持自身状态独立和可并发执行

以下是关键点说明:

  • createReceive() 方法 定义了该 Actor 支持的消息类型和对应的处理逻辑。使用 receiveBuilder().match(...).build() 来设置“消息类型 → 响应处理”的映射。
  • getSender().tell(...) 表示将处理结果异步返回给消息发送者,它等价于传统 RPC 中的“返回值”机制,只不过是通过消息的方式返回。
  • Props.create(...) 返回一个 Props 实例,描述了如何构造该 Actor。这类似于构造函数的封装工厂。Props 是 Actor 的构造“配方”,用于 ActorSystem.actorOf(...) 创建真正的 Actor 实例。

ServerApp.java

public class ServerApp {  public static void main(String[] args) {  // 使用硬编码配置启动远程 ActorSystem        Config config = ConfigFactory.parseString("""  pekko.actor.provider = remote            pekko.remote.artery.canonical.hostname = "127.0.0.1"            pekko.remote.artery.canonical.port = 25520            pekko.actor.allow-java-serialization = on            pekko.actor.serialize-messages = on            """);  ActorSystem system = ActorSystem.create("ServerSystem", config);  // 启动 HelloActor,名字是 helloActor,供客户端远程访问  ActorRef actorRef = system.actorOf(HelloActor.props(), "helloActor");  System.out.println("服务端已启动,等待远程调用...");  }  
}

代码说明

  • 用 Java 代码动态构造 Pekko 配置(替代 application.conf 文件)
  • pekko.actor.serialize-messages = on 强制所有 Actor 之间发送的消息都走序列化流程(即使是本地通信)
  • ActorSystem.create(...) 创建了一个名为 ServerSystem 的远程 Actor 系统。
  • 指定 IP 和端口为 127.0.0.1:25520,就像传统 RPC 服务绑定地址。
  • 启动一个名为 helloActor 的 actor,客户端稍后通过这个名字进行访问。

客户端代码

ClientApp.java

public class ClientApp {  public static void main(String[] args) throws Exception {  // 使用硬编码配置启动客户端 ActorSystem,端口 0 表示随机  Config config = ConfigFactory.parseString("""  pekko.actor.provider = remote            pekko.remote.artery.canonical.hostname = "127.0.0.1"            pekko.remote.artery.canonical.port = 0            pekko.actor.allow-java-serialization = on            pekko.actor.serialize-messages = on            """);  ActorSystem system = ActorSystem.create("ClientSystem", config);  // 远程 actor 路径,相当于 RPC 服务地址 + 接口名  String remotePath = "pekko://ServerSystem@127.0.0.1:25520/user/helloActor";  // 选择远程 actor,相当于创建客户端 stub        ActorSelection selection = system.actorSelection(remotePath);  // 使用 ask 模式发送消息,并接收响应(模拟同步 RPC 调用)  CompletionStage<Object> future =  Patterns.ask(selection, new HelloRequest("这是来自客户端的问候"), Duration.ofSeconds(10));  // 等待响应结果(阻塞)  future.thenApply(response -> {  if (response instanceof HelloResponse helloResponse) {  return "客户端收到回复: " + helloResponse.reply;  } else {  return "收到未知回复: " + response;  }  })  .exceptionally(ex -> "调用失败: " + ex.getMessage())  .thenAccept(System.out::println).toCompletableFuture().join();  system.terminate();  }  
}

代码说明:

  • ActorSelection是一种 actor地址定位方式,它类似于 DNS 查询,可以根据路径去“找”一个远程 actor
  • Patterns.ask(...) 就像传统 RPC 的同步调用,它封装了发送、等待响应的过程。Duration.ofSeconds(3) 指定超时时间。.get() 阻塞等待结果,实际底层是异步实现。
    Pekko(或 Akka)中,如果你不需要请求-响应(ask),而只是发送消息给 Actorfire-and-forget),你可以直接使用 ActorRef.tell(...) 方法。
// 从 ActorSystem 中选择一个路径为 "/user/helloActor" 的 Actor(可能还没拿到真实引用)
// 注意:这个路径必须匹配一个已存在的 Actor,否则会 resolve 失败
ActorSelection selection = actorSystem.actorSelection("/user/helloActor");// 异步解析 selection,尝试获取对应 Actor 的真正引用 ActorRef(带超时)
CompletionStage<ActorRef> futureRef = selection.resolveOne(Duration.ofSeconds(3));// 当成功获取 ActorRef 后,使用 tell 发送一条消息,不需要返回(fire-and-forget)
futureRef.thenAccept(ref -> ref.tell("你好", ActorRef.noSender()));

flink的RPC框架如何使用

Flink 基于Pekko实现了自己RPC框架。当需要组件间需要使用RPC服务时,只需要定义接口、编写服务端接口逻辑即可。FlinkRpc框架自己会完成接收远程请求、调度线程、安全并发、处理生命周期等工作,让你像写本地对象一样写分布式服务。

本来想直接使用flinkrpc模块创建一个简单的demo项目来说明的,但是由于Flink使用了自定义的类加载器(如 SubmoduleClassLoader)来隔离不同模块(尤其是用户代码、插件、RPC 的动态 proxy 等)导致类不可见的问题

org.flink.MyServiceGateway referenced from a method is not visible from class loader: org.apache.flink.core.classloading.SubmoduleClassLoader

所以找了flink其中一个rpc服务来进行说明

Dispatcher组件

Dispatcher集群调度的中枢组件,它的作用相当于一个集群控制器,负责接收作业、分配作业、启动作业执行组件、以及监控作业生命周期。虽然Dispatcher只是在JobManager内使用,类似
伪分布式一样,但其创建与使用流程和真正的远程RPC组件是一样的。

DIspatcher在集群启动的时候,通过DispatcherFactory创建,StandaloneSession模式下,工厂实现类为SessionDispatcherFactory

下面以Dispatcher组件为例进行说明如何基于flinkrpc框架实现一个rpc服务。

rpc框架使用流程

使用流程大致如下:

  1. 定义 RpcGateway 接口作为rpc协议
  2. 继承 RpcEndpoint或者FencedRpcEndpoint 并实现RpcGateWay接口
  3. 使用 RpcService 注册服务(启动服务端)
  4. 使用RpcService连接服务端(获取client)

步骤1.定义 RpcGateway 接口

在这里插入图片描述

Dispatcher的RPC接口类是DispatcherGateway, FencedRpcGatewayRpcGateway的子接口。 rpc方法的返回值必须是 CompletableFuture<T> 类型,这是 Flink RPC 框架的设计要求

步骤2. 实现服务端

StandaloneSession模式下,Dispatcher的实现类是StandaloneDispatcher,该类是Dispacher的子类。Dispatcher类继承FencedRpcEndpoint类并实现DispatcherGateway接口
在这里插入图片描述

RpcEndpointFlink自研RPC`框架中用于实现远程服务端逻辑的抽象类,它帮你处理 RPC 生命周期、消息分发、线程安全调度等问题,其子类只需专注于“我要提供什么服务”即可。

步骤3. 启动服务

通过工厂创建了Dispatcher对象后,调用其start()方法启动服务
在这里插入图片描述

步骤4. 远程调用

提交job的时候,会调用dispatchersubmitJob启动并调度该作业。
在这里插入图片描述

gateway是一个DispatcherGateway对象,通过下面的代码获得到的,相当于Client。
在这里插入图片描述

通过该对象调用接口方法即可发起远程调用。由于Dispatcher的客户端代码从创建到使用的代码分的太散了,不方便说明,下面通过一个简单的示例来描述Client的创建流程。

CompletableFuture<MyServiceGateway> gatewayFuture = rpcService.connect("akka://flink@127.0.0.1:6123/user/myService",MyServiceGateway.class
);MyServiceGateway gateway = gatewayFuture.get();gateway.sayHello("Flink").thenAccept(System.out::println);

MyServiceGateway.class就是定义的RpcGateway接口, gateway是一个远程代理对象了,调用它就等于远程 RPC 调用!

Client是如何发送消息的

已知flink底层是利用Pekko来实现rpc调用的,再次回顾flink rpc示例代码中可以想到

CompletableFuture<MyServiceGateway> gatewayFuture = rpcService.connect("akka://flink@127.0.0.1:6123/user/myService",MyServiceGateway.class
);
MyServiceGateway gateway = gatewayFuture.get();
gateway.sayHello("Flink").thenAccept(System.out::println);

该gateway对象发起远程调用,本质上应该是使用了类似下面的代码来发送消息的

CompletionStage<Object> future =  Patterns.ask(selection, "Flink", Duration.ofSeconds(3));

这个gateway对象是由rpcService.connect返回的. rpcService是一个RpcService接口对象,其实现就4个,排除掉测试用的就剩一个 PekkoRpcService了。

connect方法的源码
继续看connect方法的源码,首先会先调用resolveActorAddress解析入参的rpc地址"akka://flink@127.0.0.1:6123/user/myService"得到一个ActorRef对象

private CompletableFuture<ActorRef> resolveActorAddress(String address) {  final ActorSelection actorSel = actorSystem.actorSelection(address);  return actorSel.resolveOne(configuration.getTimeout())  .toCompletableFuture()  .exceptionally(  error -> {  throw new CompletionException(  new RpcConnectionException(  String.format(  "Could not connect to rpc endpoint under address %s.",  address),  error));  });  
}

获取到ActorRef后,使用 Java 的 动态代理机制创建一个实现了MyServiceGateway接口的代理对象 proxy
在这里插入图片描述

既然是动态代理,那就得看Handler方法里面的逻辑了,创建Handler的invocationHandlerFactory代码如下:
在这里插入图片描述

查看对应的invoke方法会看到实际发消息的是invokeRpc
在这里插入图片描述

所以最终actor.tell是在这里被调用的

转换成rpc参数的逻辑如下,只是将被调用方法所需的参数与信息封装成MethodInvocation对象
在这里插入图片描述

Server是接收处理消息的

前面的代码已经知道了client通过Pekko的actor发送了消息,现在要看Server这边是怎么处理的了(找到Actor处理RpcInvocation消息)。

服务端需要继承RpcEndpoint类,并在构建的时候传递rpcService对象`

final RpcService rpcService = ...; // 通常通过 PekkoRpcServiceUtils 创建
final String endpointId = "myService";MyServiceEndpoint endpoint = new MyServiceEndpoint(rpcService, endpointId);
endpoint.start();  // 启动 RPC 服务端

查看RpcEndpoint的构造函数,可以看到利用rpcService对象启动了一个rpcServer
在这里插入图片描述

继续往下看前需要了解Actor是如何处理消息的:
​ActorSystem​​ 是Pekko应用的中央控制枢纽,作为单例容器管理所有Actor的层级结构和生命周期。当发送消息给远程Actor时,ActorSystem会自动将消息序列化并通过网络传输到目标节点,在远程节点反序列化后放入目标ActorMailbox队列,最终由目标节点的ActorSystem调度线程执行消息处理,整个过程对开发者完全透明,如同本地调用一般。

可以粗略的认为:一个Actor等同于一个Server端(轻量级),Actor内有一个队列,当有新的消息从客户端发送过来就放到该队列中。然后有一个线程不断从队列中取消息,然后调用该 Actor 的 createReceive() 所定义的行为处理消息。

了解的Actor是如何接收信息后,继续看PekkoRpcServicestartServer方法,其中调用下面的方法,通知另一个Actor来创建本RpcEndpoin对应的Actor
在这里插入图片描述

那么就要找出负责创建Actor的这个supervisor(Actor)在哪里,才能继续往下看了。

很容易就可以看到PekkoRpcService对象它的构造函数中调用下面的函数找到对应的Actor的具体类型
在这里插入图片描述

查看SupervisroActor类的createReceive()就可以看到真正创建actor的逻辑了

@Override  
public Receive createReceive() {  return receiveBuilder()  .match(StartRpcActor.class, this::createStartRpcActorMessage)  .matchAny(this::handleUnknownMessage)  .build();  
}

flink rpc框架中,所有RpcEndpoint对应的Actor的类型都是PekkoRpcActor, 只是名字不一样而已。在PekkoRpcActorCreateReceive()可以看到与Client发送过来的RPC消息相对应的处理逻辑。
在这里插入图片描述

在这里插入图片描述

通过反射调用方法,此处的rpcEndpoint就是我们继承了RcpEndpoint的对象
在这里插入图片描述

到此,我们就知道了服务端的业务方法是如何被调用了。

RpcService的作用

在前面介绍 Flink 中 Client 与 Server 如何工作的过程中,我们可以看到其底层是通过 Pekko实现远程通信的。但在调用流程中,业务代码中并没有直接与 ActorSystemActorRef 等 Pekko 原生类打交道。这是因为 Flink 通过一层抽象 RpcService优雅地屏蔽了底层通信实现的细节

// 1. 创建 RpcService(基于 Pekko 实现)
RpcService rpcService = ...;// 2. 实例化 Dispatcher(继承自 RpcEndpoint)
StandaloneDispatcher dispatcher = new StandaloneDispatcher(rpcService, ...);// 3. 注册服务端
DispatcherGateway dispatcherGateway = rpcService.startServer(dispatcher);// 4. 客户端连接(可在其他进程中执行)
rpcService.connect("pekko://flink@host:6123/user/dispatcher", DispatcherGateway.class);

如果没有 RpcService 这一层抽象,Flink 的组件(如 Dispatcher、JobMaster)之间想要通信,就必须直接操作 Pekko 的底层 API,比如:

  • 使用 ActorSystem 创建 ActorRef
  • 使用 tell()ask() 发送消息;
  • 管理消息序列化和远程地址;
  • 处理超时、线程调度等复杂细节。

这会导致:

  • Actor 概念侵入业务逻辑,开发就需要学习Actor相关的知识;
  • 接口强耦合通信实现,未来若切换通信框架非常困难;
  • 本地调用与远程调用流程不统一,维护复杂。
http://www.lryc.cn/news/572287.html

相关文章:

  • 神经网络试题
  • DL___线性神经网络
  • 数据结构 二叉树理论、递归理论与快速排序理论 6.19
  • 01.线性代数是如何将复杂的数据结构转化为可计算的数学问题,这个过程是如何进行的
  • OpenAI的Prompt工程
  • 03.自动特征提取(深度学习)核心逻辑:通过多层非线性变换,让模型自动学习从原始数据到高层特征的映射。为什么多层非线性变换可以达到这样的效果?
  • 【LINUX网络】网络socet接口的基本使用以及实现简易UDP通信
  • Linux内存进阶
  • 七彩喜智慧康养平台:重构银发生活的数字守护网
  • LeetCode 2187.完成旅途的最少时间
  • 数据库连接池(Druid、HikariCP)详解
  • vector模拟实现中的迭代器失效问题
  • SQL等价改写优化
  • 算法打卡22天
  • Codeforces Round 1032 (Div. 3)
  • Excel学习01
  • Arduino入门教程:11、直流步进驱动
  • 小型语言模型(SLMs)有望重塑自主AI:效率、成本与实际部署
  • tensor向量按任意维度进行切片、拆分、组合
  • 如何将缓存存到客户端浏览器上呢
  • 计算机视觉(Computer Vision, CV)
  • 前端实现即时通讯:短轮询、长轮询、SSE 与 WebSocket 全面解析
  • MySQL层级查询实战:无函数实现部门父路径
  • MyBatis 简介
  • 《超级处理器》怎么安装到WPS/excel(最后有下载地址)
  • 基于Spring Boot+Vue的“暖寓”宿舍管理系统设计与实现(源码及文档)
  • 解锁身心密码:从“心”拥抱健康生活
  • 20250619在Ubuntu20.04.6下编译Rockchip瑞芯微原厂的RK3576的Buildroot系统
  • Zephyr boot
  • Three.js WebGL2.0深度应用:解锁图形渲染新极限