spring-ai-alibaba 之 graph 槽点
函数式编程
spring-ai-alibaba-graph-core 的代码也学习有一段时间了,有些地方确实还不尽如人意
其中有些底层设计感觉确实不太符合java开发人员的习惯,不太容易理解
究其根源,个人感觉应该是照搬 langgraph 的逻辑,强行使用函数式编程,没有进行java面向对象的适配导致的(反正我是不承认我不习惯大范围使用 java 的函数式编程的 :))
毕竟在 langgraph 的官网介绍中都说了,Nodes 和 Edges 仅仅是 Python 函数
但做 Java 开发从没听过“仅仅是 Java Lambda表达式”或者“仅仅是Java Function”的说法,类比应该是“仅仅是 Java 类”或“仅仅是 Java 接口”
举个例子
原先 langgraph 的使用逻辑可能类似下面这样
def chatbot(state: State):return {"messages": [llm.invoke(state["messages"])]}llmNode = Node(chatbot)
graph_builder.add_node("chatbot", llmNode)
在python中,函数本身也是一种对象,可以当做参数进行传递
但是在java中,如果要完全复制这种操作,就需要引入Function等函数式编程,进行一层封装
NodeAction llmAction = overallState -> {Map<String, Object> map = ...//业务逻辑return map;
}Node llmNode = new Node(llmAction)
Node及NodeAction的定义如下,其中NodeAction是一个继承了Function接口的接口
public class Node {@FunctionalInterfacepublic interface NodeAction {Map<String, Object> apply(OverAllState state) throws Exception;}private final String id;private final NodeAction actionFactory;
}
但是这不够,在python中一个函数就能将参数转为异步
def chatbot(state: State):return {"messages": [llm.invoke(state["messages"])]}llmNode = Node(chatbot)
graph_builder.add_node("chatbot", async(llmNode))
那怎么办?spring-ai-alibaba-graph-core 为了支持这种写法,在外面又套了一层
public class Node {@FunctionalInterfacepublic interface AsyncNodeAction extends Function<OverAllState, CompletableFuture<Map<String, Object>>> {CompletableFuture<Map<String, Object>> apply(OverAllState state);@FunctionalInterfacepublic interface NodeAction {Map<String, Object> apply(OverAllState state) throws Exception;}static AsyncNodeAction node_async(NodeAction syncAction);}private final String id;private final AsyncNodeAction asyncActionFactory;
}
然后,python还支持多参数,即节点函数的入参包括State和RunnableConfig
def chatbot(state: State, runnableConfig: RunnableConfig):return {"messages": [llm.invoke(state["messages"])]}llmNode = Node(chatbot)
graph_builder.add_node("chatbot", async(llmNode))
那就在AsyncNodeAction外面再套一层AsyncNodeActionWithConfig,继承BiFunction
具体代码我就不贴了,后面还又套了一层ActionFactory
总之,让习惯java写法的人非常不习惯
如果是java开发一般会怎么写?一个接口搞定
public interface GraphNode {Map<String, Object> action(OverAllState state);default Map<String, Object> action(OverAllState state, RunnableConfig config) {return action(state);}default CompletableFuture<Map<String, Object>> asyncAction(OverAllState state) {return CompletableFuture.supplyAsync(() -> action(state));}default CompletableFuture<Map<String, Object>> asyncAction(OverAllState state, RunnableConfig config) {return CompletableFuture.supplyAsync(() -> action(state, config));}}
甚至普通节点都不需要异步,异步放到并行节点里去就ok了
public interface GraphNode {Map<String, Object> action(OverAllState state);default Map<String, Object> action(OverAllState state, RunnableConfig config) {return action(state);}}public interface ParallelNode extends GraphNode {default CompletableFuture<Map<String, Object>> asyncAction(OverAllState state) {return CompletableFuture.supplyAsync(() -> action(state));}default CompletableFuture<Map<String, Object>> asyncAction(OverAllState state, RunnableConfig config) {return CompletableFuture.supplyAsync(() -> action(state, config));}}
边其实也存在同样的问题,要封装为函数式的EdgeAction来操作
异步
在graph的代码中可以看到大量的CompletableFuture,但实际跟踪invoke代码执行会发现,这边刚提交了异步,转头就调用CompletableFuture的get方法阻塞在那了,完全失去了异步的作用,并没有与其他代码并行
可以说整个graph可能就只有ParallelNode那里有一点点并行
而且graph,也就是图,或者叫状态机,本身的机制就是一个节点执行完,才会进入下一个节点,本身就带有很强的顺序执行的特性,完全没必要搞一堆CompletableFuture,只需要在并行节点内部进行一点特殊处理就ok了
如果实在想异步处理节点,需要自行调用 CompiledGraph 的 stream 方法,然后实现一个异步处理器(可以参考spring-ai-alibaba-deepsearch 的 GraphProcess 类)