Flink 自定义类加载器和子优先类加载策略
子类优先加载
Flink 默认采用了子优先(Child-First)的类加载策略来加载用户代码,以解决潜在的依赖冲突问题。
我们可以通过源码来证明这一点。
ChildFirstClassLoader
的实现
Flink 中负责实现“子优先”加载逻辑的核心类是 ChildFirstClassLoader
。其关键的 loadClassWithoutExceptionHandling
方法定义了类加载的顺序。
// ... existing code ...
public final class ChildFirstClassLoader extends FlinkUserCodeClassLoader {/*** The classes that should always go through the parent ClassLoader. This is relevant for Flink* classes, for example, to avoid loading Flink classes that cross the user-code/system-code* barrier in the user-code ClassLoader.*/private final String[] alwaysParentFirstPatterns;public ChildFirstClassLoader(URL[] urls,ClassLoader parent,String[] alwaysParentFirstPatterns,Consumer<Throwable> classLoadingExceptionHandler) {super(urls, parent, classLoadingExceptionHandler);this.alwaysParentFirstPatterns = alwaysParentFirstPatterns;}@Overrideprotected Class<?> loadClassWithoutExceptionHandling(String name, boolean resolve)throws ClassNotFoundException {// First, check if the class has already been loadedClass<?> c = findLoadedClass(name);if (c == null) {// 1. 检查是否应该“父优先”加载for (String alwaysParentFirstPattern : alwaysParentFirstPatterns) {if (name.startsWith(alwaysParentFirstPattern)) {return super.loadClassWithoutExceptionHandling(name, resolve);}}try {// 2. 在子加载器(用户JAR包)中查找类c = findClass(name);} catch (ClassNotFoundException e) {// 3. 如果找不到,再委托给父加载器c = super.loadClassWithoutExceptionHandling(name, resolve);}} else if (resolve) {resolveClass(c);}return c;}
// ... existing code ...
从上面的代码中我们可以清晰地看到:
- 检查父优先例外:首先,代码会检查要加载的类是否匹配
alwaysParentFirstPatterns
列表中的模式(例如java.
、org.apache.flink.
等)。如果匹配,则直接委托给父加载器,确保 Flink 核心类和 JDK 类不会被用户的版本覆盖。 - 子加载器优先:如果不属于父优先的例外情况,它会先尝试调用
findClass(name)
,在自己的 URL 路径(即用户的 JAR 包)中查找类。这正是“Child-First”的核心体现。 - 回退到父加载器:只有当
findClass(name)
抛出ClassNotFoundException
,即在用户 JAR 包中找不到该类时,它才会调用super.loadClassWithoutExceptionHandling(name, resolve)
,将加载任务委托给父加载器。
类加载器的创建
FlinkUserCodeClassLoaders
这个工具类是创建用户代码类加载器的工厂。它根据配置来决定是创建 ChildFirstClassLoader
还是 ParentFirstClassLoader
。
// ... existing code ...public static MutableURLClassLoader create(ResolveOrder resolveOrder,URL[] urls,ClassLoader parent,String[] alwaysParentFirstPatterns,Consumer<Throwable> classLoadingExceptionHandler,boolean checkClassLoaderLeak) {switch (resolveOrder) {case CHILD_FIRST:return childFirst(urls,parent,alwaysParentFirstPatterns,classLoadingExceptionHandler,checkClassLoaderLeak);case PARENT_FIRST:return parentFirst(urls, parent, classLoadingExceptionHandler, checkClassLoaderLeak);default:throw new IllegalArgumentException("Unknown class resolution order: " + resolveOrder);}}/** Class resolution order for Flink URL {@link ClassLoader}. */public enum ResolveOrder {CHILD_FIRST,PARENT_FIRST;public static ResolveOrder fromString(String resolveOrder) {if (resolveOrder.equalsIgnoreCase("parent-first")) {return PARENT_FIRST;} else if (resolveOrder.equalsIgnoreCase("child-first")) {return CHILD_FIRST;} else {throw new IllegalArgumentException("Unknown resolve order: " + resolveOrder);}}}
// ... existing code ...
Flink 的配置项 classloader.resolve-order
的默认值是 child-first
,因此在大多数情况下,Flink 会创建并使用 ChildFirstClassLoader
来加载用户代码。
总结
源码清楚地表明,Flink 通过 ChildFirstClassLoader
实现了子优先的类加载机制:优先在用户 JAR 包中查找类,找不到时才委托给父加载器,同时通过 alwaysParentFirstPatterns
保证了 Flink 自身和核心依赖的稳定性。
MutableURLClassLoader
MutableURLClassLoader
被创建出来主要是为了解决标准 java.net.URLClassLoader
的一些限制,并为 Flink 的类加载机制提供更强的灵活性。它主要扩展了以下两个核心能力:
在 Java 原生的 URLClassLoader
中,addURL(URL url)
方法是 protected
的,这意味着只有其子类或者同一个包下的类才能调用它。这在使用上造成了不便,因为我们无法从外部动态地向一个已存在的 URLClassLoader
实例中添加新的类路径(比如新的 JAR 包)。
MutableURLClassLoader
通过重写 addURL
方法并将其访问权限修改为 public
,解决了这个问题。
// ... existing code ...
@Internal
public abstract class MutableURLClassLoader extends URLClassLoader {// ... existing code ...public MutableURLClassLoader(URL[] urls, ClassLoader parent) {super(urls, parent);}@Overridepublic void addURL(URL url) {super.addURL(url);}/**
// ... existing code ...*/public abstract MutableURLClassLoader copy();
}
这样做之后,Flink 框架的任何部分都可以在运行时向这个 ClassLoader 实例中动态添加 JAR 文件的 URL,从而加载新的类。这对于需要动态加载用户代码和依赖的场景(例如 SQL Client 提交一个带 UDF 的 JAR 包)至关重要。
例如,在 flink-table-runtime
模块的 SqlDriver
中,就利用了这个特性来动态加载执行器相关的 JAR:
// ... existing code ...private static ClassLoader getClassLoader() throws Exception {MutableURLClassLoader sqlGatewayClassloader =(MutableURLClassLoader) Thread.currentThread().getContextClassLoader();try {sqlGatewayClassloader.loadClass(RUNNER_CLASS_NAME);LOG.info("Load {} from the classpath.", RUNNER_CLASS_NAME);} catch (ClassNotFoundException e) {LOG.info("{} is not in the classpath. Finding...", RUNNER_CLASS_NAME);sqlGatewayClassloader.addURL(findExecutor().toUri().toURL());}return sqlGatewayClassloader;}
// ... existing code ...
定义 copy
抽象方法
MutableURLClassLoader
还定义了一个抽象方法 copy()
。
// ... existing code .../*** Copy the classloader for each job and these jobs can add their jar files to the classloader* independently.** @return the copied classloader*/public abstract MutableURLClassLoader copy();
}
该方法的 Javadoc 注释解释了其用途:“为每个作业复制类加载器,这些作业可以独立地将其 JAR 文件添加到类加载器中。”
在 Flink 中,不同的作业(Job)可能需要不同的依赖库。为了实现作业间的类路径隔离,避免依赖冲突,一个常见的做法是为每个作业创建一个独立的 ClassLoader。copy()
方法就是这个机制的体现,它强制所有子类实现一个复制自身的能力,从而可以方便地为新作业创建一个拥有相同初始类路径但又相互独立的 ClassLoader 实例。
例如,ChildFirstClassLoader
就实现了这个 copy
方法:
// ... existing code ...@Overridepublic MutableURLClassLoader copy() {return new ChildFirstClassLoader(getURLs(), getParent(), alwaysParentFirstPatterns, classLoadingExceptionHandler);}
}
总结
总而言之,MutableURLClassLoader
是 Flink 自定义类加载体系的一个重要基类。它通过公开 addURL
方法提供了在运行时动态修改类路径的灵活性,并通过强制实现 copy
方法来支持为不同作业创建隔离的类加载环境,从而更好地满足了 Flink 作为分布式计算框架对类加载的复杂需求。
FlinkUserCodeClassLoader
FlinkUserCodeClassLoader
是 Flink 中用于加载用户代码(例如,作业的 JAR 包、UDF 等)的核心抽象基类。它继承自我们之前讨论过的 MutableURLClassLoader
,因此它具备了动态添加 URL 和被复制的能力。在此基础上,它增加了更精细的控制和扩展能力。
FlinkUserCodeClassLoader
最核心的增强是引入了一套可定制的异常处理机制。
- 它定义了一个
Consumer<Throwable> classLoadingExceptionHandler
字段。 - 在构造函数中,可以传入一个具体的异常处理器。如果未提供,则使用默认的
NOOP_EXCEPTION_HANDLER
(即什么也不做)。
这个机制是通过重写 loadClass
方法实现的,这也是这个类的设计精髓所在:
// ... existing code ...@Overridepublic final Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {try {synchronized (getClassLoadingLock(name)) {return loadClassWithoutExceptionHandling(name, resolve);}} catch (Throwable classLoadingException) {classLoadingExceptionHandler.accept(classLoadingException);throw classLoadingException;}}/*** Same as {@link #loadClass(String, boolean)} but without exception handling.** <p>Extending concrete class loaders should implement this instead of {@link* #loadClass(String, boolean)}.*/protected Class<?> loadClassWithoutExceptionHandling(String name, boolean resolve)throws ClassNotFoundException {return super.loadClass(name, resolve);}
// ... existing code ...
可以看到:
loadClass
方法被声明为final
,这意味着子类不能重写它。这保证了所有 Flink 用户代码类加载器的行为一致性。- 它内部使用
try-catch
块包裹了实际的类加载逻辑loadClassWithoutExceptionHandling
。 - 当捕获到任何
Throwable
(包括ClassNotFoundException
等)时,它会首先调用classLoadingExceptionHandler
来处理这个异常,然后再将异常抛出。
这提供了一个强大的钩子(Hook),使得 Flink 可以在类加载失败时执行一些自定义逻辑,例如记录详细日志、发送告警等,而无需修改每个具体的类加载器实现。
设计模式:模板方法(Template Method)
FlinkUserCodeClassLoader
完美地运用了模板方法设计模式。
- 模板方法:
public final Class<?> loadClass(...)
定义了类加载的整体骨架和算法流程(加锁 -> 加载 -> 异常处理)。这个流程是固定不变的。 - 抽象/钩子方法:
protected Class<?> loadClassWithoutExceptionHandling(...)
是留给子类去实现的具体步骤。子类通过重写这个方法来定义自己独特的加载策略(例如,是父类优先还是子类优先)。
例如,ChildFirstClassLoader
会重写 loadClassWithoutExceptionHandling
来实现先从自己的 URL 中查找类,找不到再去父加载器中查找的逻辑。而 ParentFirstClassLoader
则可以直接使用默认实现,即调用 super.loadClass
,这本身就是父类优先的逻辑。
这种设计将通用的、不变的逻辑(如线程同步、异常处理)与易变的、具体的逻辑(加载顺序)解耦,使得整个类加载体系更加清晰和易于扩展。
在 Flink 生态中的角色
- 用户代码隔离的基石:正如其名,它是加载所有用户代码的专用 ClassLoader。Flink 通过为不同的作业或任务创建不同的
FlinkUserCodeClassLoader
实例,实现了依赖隔离。 - 加载策略的统一入口:无论是
parent-first
还是child-first
策略,都是通过FlinkUserCodeClassLoaders
这个工厂类创建出不同的FlinkUserCodeClassLoader
子类实例来实现的。 - 安全网机制:如
SafetyNetWrapperClassLoader
所示,它可以包裹一个FlinkUserCodeClassLoader
实例,以防止 ClassLoader 泄漏。这也体现了其作为 Flink 类加载核心组件的地位。
总结
FlinkUserCodeClassLoader
在 MutableURLClassLoader
的基础上,通过模板方法模式增加了两个关键特性:
- 统一的、可定制的异常处理机制,增强了系统的可观测性和健壮性。
- 规范化的类加载流程,强制所有子类遵循固定的加载骨架(如线程安全),同时又允许它们灵活定义具体的加载顺序。
它是 Flink 实现复杂、健壮且灵活的类加载机制的核心基类。
ChildFirstClassLoader
ChildFirstClassLoader
是 Flink 中实现**子优先(Child-First)**类加载策略的具体实现,也是 Flink 默认的用户代码加载器。它继承自 FlinkUserCodeClassLoader
。
"子优先"(也称为“反向类加载”)意味着在加载一个类时,会优先在子加载器(即用户代码的 JAR 包)中进行搜索,只有在找不到的情况下,才会委托给父加载器(通常是 Flink 的应用类加载器)进行搜索。
这种机制是 Flink 实现用户代码与 Flink 框架本身依赖隔离的关键。它允许用户在自己的作业 JAR 包中打包任意版本的依赖库,即使这些库与 Flink 内部使用的版本冲突,也能正常运行。例如,用户作业可以使用 Guava 29.0,而 Flink 框架可能使用 Guava 18.0,两者不会互相干扰。
下面我们结合代码,分模块来详细解析 ChildFirstClassLoader
的实现。
// ... existing code ...
public final class ChildFirstClassLoader extends FlinkUserCodeClassLoader {/*** The classes that should always go through the parent ClassLoader. This is relevant for Flink* classes, for example, to avoid loading Flink classes that cross the user-code/system-code* barrier in the user-code ClassLoader.*/private final String[] alwaysParentFirstPatterns;public ChildFirstClassLoader(URL[] urls,ClassLoader parent,String[] alwaysParentFirstPatterns,Consumer<Throwable> classLoadingExceptionHandler) {super(urls, parent, classLoadingExceptionHandler);this.alwaysParentFirstPatterns = alwaysParentFirstPatterns;}
// ... existing code ...
alwaysParentFirstPatterns
: 这是ChildFirstClassLoader
的一个核心属性。它是一个字符串数组,定义了一组类名前缀模式。凡是匹配这些模式的类,都不会遵循“子优先”的策略,而是会被强制**总是使用父优先(Parent-First)**的策略加载。- 为什么需要这个例外? 因为有些类是 Flink 框架和用户代码之间交互的“接口”或“桥梁”。如果这些接口类被用户代码的 ClassLoader 和 Flink 框架的 ClassLoader 分别加载一次,就会产生两个完全不同(尽管类名相同)的
Class
对象。这会导致ClassCastException
(类型转换异常,例如 "X cannot be cast to X")。因此,必须确保这些核心接口类在整个 JVM 中只被加载一次,并且是由父加载器加载。典型的例子包括 Flink 的核心 API (org.apache.flink.*
)、Java 的核心库 (java.*
,javax.*
) 等。这些模式可以通过 Flink 的配置文件flink-conf.yaml
中的classloader.parent-first-patterns
来配置。
核心逻辑:loadClassWithoutExceptionHandling
这是实现“子优先”加载策略的核心所在。它重写了父类 FlinkUserCodeClassLoader
的模板方法。
// ... existing code ...@Overrideprotected Class<?> loadClassWithoutExceptionHandling(String name, boolean resolve)throws ClassNotFoundException {// 步骤 1: 检查类是否已经被加载过Class<?> c = findLoadedClass(name);if (c == null) {// 步骤 2: 检查是否匹配“父优先”模式for (String alwaysParentFirstPattern : alwaysParentFirstPatterns) {if (name.startsWith(alwaysParentFirstPattern)) {// 如果匹配,直接委托给父类加载(即标准的父优先逻辑)return super.loadClassWithoutExceptionHandling(name, resolve);}}try {// 步骤 3: 尝试在子加载器(自己的URL)中查找类c = findClass(name);} catch (ClassNotFoundException e) {// 步骤 4: 如果在子加载器中找不到,再委托给父类加载c = super.loadClassWithoutExceptionHandling(name, resolve);}} else if (resolve) {resolveClass(c);}return c;}
// ... existing code ...
加载流程可以分解为以下几个步骤:
- 检查缓存: 首先调用
findLoadedClass(name)
,检查这个类是否已经被当前的 ClassLoader 加载过了。这是 JVM 类加载的标准第一步,避免重复加载。 - 检查“父优先”例外: 如果类尚未加载,则遍历
alwaysParentFirstPatterns
列表。如果类名name
匹配了其中任何一个模式,就立即中断“子优先”逻辑,直接调用super.loadClassWithoutExceptionHandling(name, resolve)
。这会触发标准的双亲委派模型,即从父加载器开始向上查找。 - 子加载器查找: 如果类名不匹配任何“父优先”模式,则执行“子优先”的核心逻辑:调用
findClass(name)
。这个方法会只在当前 ClassLoader 的 URL 列表(即用户 JAR 包)中查找类。 - 委托父加载器: 如果
findClass(name)
抛出ClassNotFoundException
(意味着在用户 JAR 包中没找到这个类),则在catch
块中,最后再调用super.loadClassWithoutExceptionHandling(name, resolve)
,将加载任务委托给父加载器。
这个逻辑清晰地实现了“子优先,但有例外”的加载策略。
资源加载:getResource
和 getResources
加载资源(如配置文件)的逻辑与加载类相似,也需要遵循“子优先”的原则。
getResource(String name)
// ... existing code ...@Overridepublic URL getResource(String name) {// 优先在子加载器(自己的URL)中查找资源URL urlClassLoaderResource = findResource(name);if (urlClassLoaderResource != null) {return urlClassLoaderResource;}// 如果找不到,再委托给父加载器return super.getResource(name);}
// ... existing code ...
这个实现非常直接:先调用 findResource(name)
在自己的 URL 中查找,如果找到了就立刻返回;否则,调用 super.getResource(name)
委托给父加载器。
getResources(String name)
这个方法用于查找所有同名的资源,并返回一个 Enumeration
。
// ... existing code ...@Overridepublic Enumeration<URL> getResources(String name) throws IOException {// 1. 先获取所有子加载器中的资源Enumeration<URL> urlClassLoaderResources = findResources(name);final List<URL> result = new ArrayList<>();while (urlClassLoaderResources.hasMoreElements()) {result.add(urlClassLoaderResources.nextElement());}// 2. 再获取所有父加载器中的资源Enumeration<URL> parentResources = getParent().getResources(name);while (parentResources.hasMoreElements()) {result.add(parentResources.nextElement());}// 3. 将两者合并,并返回一个新的 Enumerationreturn new Enumeration<URL>() {Iterator<URL> iter = result.iterator();public boolean hasMoreElements() {return iter.hasNext();}public URL nextElement() {return iter.next();}};}
// ... existing code ...
这里的实现保证了返回的资源列表中,来自子加载器的资源总是排在来自父加载器的资源前面。
总结
ChildFirstClassLoader
是 Flink 实现依赖隔离和灵活部署的关键组件。它通过重写类和资源的加载方法,颠覆了 Java 默认的双亲委派模型,实现了**子优先(Child-First)**的加载顺序。同时,通过 alwaysParentFirstPatterns
配置项保留了一个“后门”,允许特定的、必须在框架和用户代码间共享的类库继续使用父优先加载,从而在灵活性和稳定性之间取得了平衡。
ParentFirstClassLoader
ParentFirstClassLoader
是 FlinkUserCodeClassLoaders
的一个静态内部类,它实现了父优先(Parent-First)的类加载策略。这种策略是 Java 标准的双亲委派模型的直接体现。
// ... existing code .../*** Regular URLClassLoader that first loads from the parent and only after that from the URLs.*/@Internalpublic static class ParentFirstClassLoader extends FlinkUserCodeClassLoader {ParentFirstClassLoader(URL[] urls, ClassLoader parent, Consumer<Throwable> classLoadingExceptionHandler) {super(urls, parent, classLoadingExceptionHandler);}static {ClassLoader.registerAsParallelCapable();}@Overridepublic MutableURLClassLoader copy() {return new ParentFirstClassLoader(getURLs(), getParent(), classLoadingExceptionHandler);}}
// ... existing code ...
- 继承关系: 它继承自
FlinkUserCodeClassLoader
。这意味着它拥有了FlinkUserCodeClassLoader
的所有特性,包括可定制的异常处理机制和作为MutableURLClassLoader
的动态添加 URL 及copy
的能力。 - 构造函数: 它的构造函数非常简单,只是直接调用了父类
FlinkUserCodeClassLoader
的构造函数,传递了必要的参数。 copy()
方法: 它实现了copy()
方法,用于创建一个具有相同 URL、父加载器和异常处理器的新实例。
核心加载逻辑(缺失的 loadClassWithoutExceptionHandling
)
您可能会注意到,ParentFirstClassLoader
没有重写 loadClassWithoutExceptionHandling
方法。这正是它实现“父优先”策略的关键所在。
我们回顾一下它的父类 FlinkUserCodeClassLoader
中的这个方法:
// ... existing code ...protected Class<?> loadClassWithoutExceptionHandling(String name, boolean resolve)throws ClassNotFoundException {return super.loadClass(name, resolve);}
// ... existing code ...
由于 ParentFirstClassLoader
没有重写它,那么当调用 loadClass
时,它会执行 FlinkUserCodeClassLoader
中的默认实现,即 return super.loadClass(name, resolve)
。这里的 super
指向的是 URLClassLoader
。
URLClassLoader
的 loadClass
方法遵循标准的双亲委派模型:
- 检查类是否已加载。
- 如果未加载,将加载请求委托给父加载器。
- 只有当父加载器(以及父加载器的父加载器,一直到 Bootstrap ClassLoader)都无法加载该类时,才会尝试在自己的 URL 路径中查找(即调用
findClass
)。
因此,ParentFirstClassLoader
通过不重写核心加载方法,天然地继承并实现了标准的“父优先”加载逻辑。
为什么还需要“父优先”?到底用哪个?
这是一个非常好的问题,触及了 Flink 类加载设计的核心权衡。
虽然 Flink 默认并推荐使用 ChildFirstClassLoader
来解决依赖冲突,但在某些特定场景下,“父优先”仍然是必要或更优的选择:
- 简单场景和测试: 在一些简单的作业或测试环境中,用户代码的依赖非常清晰,与 Flink 框架没有冲突。在这种情况下,使用标准的“父优先”模型更简单、更符合 Java 的传统习惯,也更容易理解和调试。
- 避免行为不一致: 当用户代码严重依赖于 Flink 框架提供的某个特定版本的库时,如果使用“子优先”并意外地打包了一个不兼容的版本,可能会导致难以预料的运行时错误。此时,明确指定“父优先”可以确保用户代码和 Flink 框架使用完全相同的依赖库,保证行为的一致性。
- 向后兼容和用户选择: Flink 作为一个成熟的框架,需要提供选项以适应不同的用户需求和历史遗留问题。在早期版本或某些特定部署模式下,用户可能已经习惯了“父优先”的行为。提供这个选项可以平滑过渡,并给予用户控制权。如
flink-1.10
的发布说明中就提到了这一点,客户端开始遵循类加载策略,对于希望保持旧行为的用户,可以显式配置为parent-first
。
到底用哪个?
这取决于你的具体需求和场景,选择的原则是:
默认和推荐使用
child-first
:- 当你开发复杂的 Flink 作业,引入了大量第三方依赖(如 Hadoop、HBase、各种 Connectors 的库)。
- 当你不确定你的依赖是否会与 Flink 的内部依赖冲突时。
- 绝大多数生产场景下,都应该使用
child-first
。这是 Flink 解决“依赖地狱”(Dependency Hell)问题的标准方案。
在特定情况下选择
parent-first
:- 当你的作业非常简单,几乎没有外部依赖,或者你明确知道所有依赖都与 Flink 兼容。
- 当你遇到由“子优先”引起的、难以解决的
ClassCastException
或LinkageError
,并且你希望强制作业使用 Flink 环境提供的依赖版本时。 - 在进行某些单元测试或集成测试时,为了简化环境,可能会选择“父优先”。
如何控制使用哪个?
可以从多个角度证明 child-first
是 Flink 的默认类加载行为。
在软件工程中,测试代码是验证默认行为最可靠的证据之一。在 Flink 的客户端测试中,有一个测试用例明确地断言了默认配置不是 parent-first
。
在 DefaultPackagedProgramRetrieverITCase.java
文件中,有如下测试代码:
// ... existing code ...@Testvoid testConfigurationIsConsidered() throws FlinkException {final String parentFirstConfigValue = "parent-first";// we want to make sure that parent-first is not set as a defaultassertThat(CoreOptions.CLASSLOADER_RESOLVE_ORDER.defaultValue()).isNotEqualTo(parentFirstConfigValue);final Configuration configuration = new Configuration();
// ... existing code ...
这段代码的核心是 assertThat(CoreOptions.CLASSLOADER_RESOLVE_ORDER.defaultValue()).isNotEqualTo(parentFirstConfigValue);
。它断言了 CLASSLOADER_RESOLVE_ORDER
这个配置项的默认值不等于 "parent-first"。
我们知道这个配置项只有两个可选值:child-first
和 parent-first
。既然测试证明了默认值不是 parent-first
,那么它必然是 child-first
。
官方文档 debugging_classloading.md
也明确指出了这一点。
// ... existing code ...
对于用户代码的类加载,可以通过调整Flink的[`classloader.resolve-order`]({{< ref "docs/deployment/config" >}}#classloader-resolve-order)配置将ClassLoader解析顺序还原至Java的默认模式(从Flink默认的`child-first`调整为`parent-first`)。
// ... existing code ...
括号里的内容“(从Flink默认的child-first
调整为parent-first
)”直接说明了 Flink 的默认设置是 child-first
。
查看 Flink 1.10 的发布说明,我们可以了解到这个默认值变更的历史背景。
flink-1.10.md
// ... existing code ...
# Release Notes - Flink 1.10
### Clusters & Deployment
#### Flink Client respects Classloading Policy
##### [FLINK-13749](https://issues.apache.org/jira/browse/FLINK-13749)The Flink client now also respects the configured classloading policy, i.e.,
`parent-first` or `child-first` classloading. Previously, only cluster
components such as the job manager or task manager supported this setting.
This does mean that users might get different behaviour in their programs, in
which case they should configure the classloading policy explicitly to use
`parent-first` classloading, which was the previous (hard-coded) behaviour.
// ... existing code ...
这段说明指出,从 Flink 1.10 开始,客户端也开始遵循可配置的类加载策略。它建议,如果用户发现行为有变,可以显式地配置回 parent-first
,因为那是之前硬编码的行为。这暗示了从这个版本开始,默认的行为已经不再是 parent-first
,而是可配置的 child-first
。
综合以上来自测试代码、官方文档和版本历史的三个方面的证据,我们可以非常确定地得出结论:child-first
是当前 Flink 版本中默认的类加载策略。这是 Flink 为了更好地解决用户作业与框架之间的依赖冲突而做出的重要设计决策。
FlinkUserCodeClassLoaders
这个工厂类就是根据配置项来创建对应 ClassLoader 实例的:
// ... existing code ...public static MutableURLClassLoader create(ResolveOrder resolveOrder,
// ... existing code ...switch (resolveOrder) {case CHILD_FIRST:return childFirst(
// ... existing code ...case PARENT_FIRST:return parentFirst(
// ... existing-code ...}}
// ... existing code ...
总结
ParentFirstClassLoader
是 Flink 对标准 Java 双亲委派模型的一个直接实现,它通过不重写关键的加载方法来达成“父优先”的目的。
它与 ChildFirstClassLoader
共同构成了 Flink 灵活的类加载体系,为用户提供了两种选择:
child-first
(默认): 解决依赖冲突,适用于复杂的生产环境。parent-first
: 遵循 Java 标准,适用于简单场景或需要强制依赖统一的特殊情况。
用户可以根据自己的作业复杂度和依赖情况,通过 classloader.resolve-order
配置来选择最适合的加载策略。
SafetyNetWrapperClassLoader
SafetyNetWrapperClassLoader
是 FlinkUserCodeClassLoaders
的一个静态内部类。从名字就可以看出,它是一个“安全网”包装器。它的核心目标是解决一个在长时间运行的 JVM 应用中非常棘手的问题:类加载器泄漏,以及由此导致的 OutOfMemoryError: Metaspace
。
在 Flink 的 Session 集群模式下,用户的作业代码(JARs)是动态加载的。每个作业都有自己的 FlinkUserCodeClassLoader
。当作业结束后,这个 ClassLoader 以及它加载的所有类(包括用户代码、依赖库等)都应该能被垃圾回收器(GC)回收,从而释放 Metaspace 内存。
问题在于,有些用户代码或其依赖的第三方库可能会在静态字段(static field)中持有对当前线程上下文类加载器(Thread.contextClassLoader
)的引用。当 Flink 执行用户代码时,会把线程上下文类加载器设置为该作业的 FlinkUserCodeClassLoader
。如果这个引用在作业结束后没有被清理,就会导致一个从静态变量出发的强引用链,使得 FlinkUserCodeClassLoader
无法被 GC 回收。随着作业的不断提交和结束,泄漏的 ClassLoader 会越来越多,最终耗尽 Metaspace。
SafetyNetWrapperClassLoader
就是为了打破这个潜在的引用链,充当一个“保险丝”或“安全网”。
类的定义与核心结构
// ... existing code .../*** Ensures that holding a reference on the context class loader outliving the scope of user code* does not prevent the user classloader to be garbage collected (FLINK-16245).** <p>This classloader delegates to the actual user classloader. Upon {@link #close()}, the* delegate is nulled and can be garbage collected. Additional class resolution will be resolved* solely through the bootstrap classloader and most likely result in ClassNotFound exceptions.*/@Internalpublic static class SafetyNetWrapperClassLoader extends MutableURLClassLoader {private static final Logger LOG =LoggerFactory.getLogger(SafetyNetWrapperClassLoader.class);protected volatile FlinkUserCodeClassLoader inner;protected SafetyNetWrapperClassLoader(FlinkUserCodeClassLoader inner, ClassLoader parent) {super(new URL[0], parent);this.inner = inner;}
// ... existing code ...
- 继承关系: 它继承自
MutableURLClassLoader
,所以它本身也是一个功能完备的 ClassLoader。 - 包装器模式 (Wrapper/Decorator Pattern): 它的核心是持有一个
inner
字段,这个字段指向真正的用户代码类加载器(比如ChildFirstClassLoader
或ParentFirstClassLoader
)。 - 构造函数: 构造函数接收真正的
inner
ClassLoader,并将其保存起来。注意super(new URL[0], parent)
,这个包装器自身并不管理任何 URL,它只是一个空壳。
委托与关闭
SafetyNetWrapperClassLoader
的所有类加载相关的方法,都只是简单地委托给内部的 inner
加载器来完成。
// ... existing code ...@Overridepublic Class<?> loadClass(String name) throws ClassNotFoundException {return ensureInner().loadClass(name);}@Overrideprotected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {// called for dynamic class loadingreturn ensureInner().loadClass(name, resolve);}@Overridepublic void addURL(URL url) {ensureInner().addURL(url);}@Overridepublic URL getResource(String name) {return ensureInner().getResource(name);}
// ... (其他委托方法) ...
这里的关键在于 ensureInner()
和 close()
方法。
ensureInner()
方法:
// ... existing code ...private FlinkUserCodeClassLoader ensureInner() {if (inner == null) {throw new IllegalStateException("Trying to access closed classloader. Please check if you store "+ "classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak "+ "occurs in a third party library and cannot be fixed immediately, you can disable this check "+ "with the configuration '"+ CoreOptions.CHECK_LEAKED_CLASSLOADER.key()+ "'.");}return inner;}
在每次委托之前,它都会检查 inner
是否为 null
。如果不为 null
,就返回 inner
实例;如果为 null
,则抛出异常,并给出非常详细和有用的提示信息,指导用户如何排查类加载器泄漏问题。
close()
方法:
// ... existing code ...@Overridepublic void close() {final FlinkUserCodeClassLoader inner = this.inner;if (inner != null) {try {inner.close();} catch (IOException e) {LOG.warn("Could not close user classloader", e);}}this.inner = null;}
// ... existing code ...
这是“安全网”机制的核心。当 Flink 认为一个用户代码类加载器生命周期结束时(例如,作业完成),它会调用 close()
方法。这个方法做了两件事:
- 调用
inner
ClassLoader 自己的close()
方法(如果需要释放资源)。 - 将
this.inner
设置为null
。这是最关键的一步!
"安全网"如何工作
- 包装: Flink 在创建用户代码类加载器时,如果开启了泄漏检查,并不会直接使用
ChildFirstClassLoader
,而是用SafetyNetWrapperClassLoader
将其包装起来。// ... existing code ... private static MutableURLClassLoader wrapWithSafetyNet(FlinkUserCodeClassLoader classLoader, boolean check) {return check? new SafetyNetWrapperClassLoader(classLoader, classLoader.getParent()): classLoader; } // ... existing code ...
- 设置上下文: Flink 将这个
SafetyNetWrapperClassLoader
实例设置为Thread.contextClassLoader
。 - 泄漏发生: 假设有问题的代码持有了对
SafetyNetWrapperClassLoader
实例的静态引用。 - 作业结束: Flink 调用
SafetyNetWrapperClassLoader
的close()
方法。 - 引用断开:
close()
方法将inner
字段置为null
。 - GC 生效: 现在,即使那个静态引用依然存在,它也仅仅指向一个轻量级的、几乎为空的
SafetyNetWrapperClassLoader
对象。而这个包装器对象内部已经不再引用重量级的、包含了所有用户代码和依赖的ChildFirstClassLoader
。因此,ChildFirstClassLoader
及其加载的所有类都没有了强引用,可以被 GC 正常回收。
通过这种方式,SafetyNetWrapperClassLoader
像一个“熔断器”,主动断开了泄漏的引用链,从而保护了系统的 Metaspace 不被耗尽。
总结
SafetyNetWrapperClassLoader
是 Flink 框架健壮性的一个重要体现。它通过一个巧妙的包装器/委托模式,解决了由第三方库或用户代码行为不当引起的类加载器泄漏问题。
- 功能:作为实际用户代码类加载器的代理,转发所有请求。
- 核心机制:通过
close()
方法将内部对实际类加载器的引用置为null
。 - 目的:打破因静态字段引用导致的类加载器泄漏链,确保用户代码类加载器在作业结束后能被垃圾回收,防止
Metaspace
内存溢出。 - 用户体验:当泄漏的 ClassLoader 被再次使用时,它会抛出带有明确指导信息的异常,帮助用户定位和修复问题。这个检查可以通过配置项
classloader.check-leaked-classloader
来开启或关闭。