1. Spring Batch简介

企业域内的许多应用程序都需要批量处理才能在关键任务环境中执行业务操作。这些业务包括:

  • 自动,复杂地处理大量信息,无需用户交互即可最有效地进行处理。这些操作通常包括基于时间的事件(例如月末计算,通知或通信)。

  • 定期应用非常大的数据集(例如,保险利益确定或费率调整)重复处理复杂的业务规则。

  • 从内部和外部系统接收的信息的集成,通常需要格式化,验证和以事务方式进行的处理到记录系统中。批处理用于每天为企业处理数十亿笔交易。

Spring Batch是一个轻量级的,全面的批处理框架,旨在使鲁棒的批处理应用程序的开发对于企业系统的日常运行至关重要。Spring Batch建立在人们期望的Spring框架特性(生产力,基于POJO的开发方法和普遍的易用性)的基础上,同时使开发人员在必要时可以轻松访问和利用更高级的企业服务。Spring Batch不是一个调度框架。商业空间和开放源代码空间中都有许多好的企业调度程序(例如Quartz,Tivoli,Control-M等)。它旨在与计划程序一起工作,而不是替换计划程序。

Spring Batch提供了可重用的功能,这些功能对于处理大量记录至关重要,包括日志记录/跟踪,事务管理,作业处理统计信息,作业重新启动,跳过和资源管理。它还提供了更高级的技术服务和功能,这些功能可以通过优化和分区技术实现超大量和高性能的批处理作业。Spring Batch可用于简单的用例(例如,将文件读入数据库或运行存储过程)以及复杂的大量用例(例如,在数据库之间移动大量数据,对其进行转换等)。上)。大量批处理作业可以高度可扩展的方式利用框架来处理大量信息。

1.1.背景

尽管开源软件项目和相关社区将更多的注意力集中在基于Web和基于微服务的体系结构框架上,但是仍然存在着对可重用体系结构框架的关注,以适应基于Java的批处理需求,尽管仍然需要继续处理此类问题。在企业IT环境中进行处理。缺乏标准的,可重复使用的批处理体系结构,导致在客户端企业IT功能内开发的许多一次性内部解决方案激增。

SpringSource(现为Pivotal)和埃森哲合作改变了这一点。埃森哲在实现批处理体系结构方面的动手行业和技术经验,SpringSource的深厚技术经验以及Spring久经验证的编程模型共同构成了一种自然而强大的合作伙伴关系,以创建旨在填补企业Java重要缺口的高质量,与市场相关的软件。两家公司都与许多客户合作,他们通过开发基于Spring的批处理体系结构解决方案来解决类似的问题。这提供了一些有用的附加细节和现实生活中的约束条件,有助于确保解决方案可以应用于客户提出的现实问题。

埃森哲为Spring Batch项目贡献了以前专有的批处理架构框架,以及用于推动支持,增强功能和现有功能集的提交者资源。埃森哲的贡献基于数十年来在使用最后几代平台构建批处理体系结构方面的经验:COBOL / Mainframe,C ++ / Unix,以及现在的Java / anywhere。

埃森哲与SpringSource之间的合作旨在促进软件处理方法,框架和工具的标准化,企业用户在创建批处理应用程序时可以始终利用它们。希望为企业IT环境提供标准的,经过验证的解决方案的公司和政府机构可以从Spring Batch中受益。

1.2.使用场景

典型的批处理程序通常:

  • 从数据库,文件或队列中读取大量记录。

  • 以某种方式处理数据。

  • 以修改后的形式写回数据。

Spring Batch自动执行此基本批处理迭代,提供了将一组类似的交易作为一组处理的功能,通常在脱机环境中无需任何用户交互。批处理作业是大多数IT项目的一部分,Spring Batch是唯一提供可靠的企业级解决方案的开源框架。

业务场景

  • 定期提交批处理

  • 并行批处理:作业的并行处理

  • 分阶段的企业消息驱动的处理

  • 大规模并行批处理

  • 失败后手动或计划重启

  • 顺序处理相关步骤(扩展了工作流程驱动的批次)

  • 部分处理:跳过记录(例如,回滚时)

  • 整批交易,适用于小批量或现有存储过程/脚本的情况

技术目标

  • 批处理开发人员使用Spring编程模型:专注于业务逻辑,并让框架处理基础结构。

  • 在基础结构,批处理执行环境和批处理应用程序之间明确分离关注点。

  • 提供通用的核心执行服务作为所有项目都可以实现的接口。

  • 提供可以直接使用的核心执行接口的简单和默认实现。

  • 通过在所有层中利用spring框架,轻松配置,定制和扩展服务。

  • 所有现有的核心服务应易于替换或扩展,而不会影响基础架构层。

  • 提供一个简单的部署模型,其架构JAR与使用Maven构建的应用程序完全分开。

1.3.Spring Batch 架构

Spring Batch在设计时考虑了可扩展性,并考虑了各种最终用户。下图显示了支持最终用户开发人员的可扩展性和易用性的分层体系结构。

图1.1:Spring Batch分层架构
图1. Spring Batch分层架构

这种分层的体系结构突出了三个主要的高级组件:应用程序,核心和基础结构。该应用程序包含所有批处理作业和开发人员使用Spring Batch编写的自定义代码。批处理核心包含启动和控制批处理作业所需的核心运行时类。它包括实现 JobLauncherJobStepApplication和Core都建立在通用基础架构之上。此基础结构包含通用的读写器和服务(例如RetryTemplate),应用程序开发人员(例如ItemReader和的读写器ItemWriter)和核心框架本身(重试,它是自己的库)都使用它们。

1.4.通用批处理原则和准则

构建批处理解决方案时,应考虑以下关键原则,准则和一般注意事项。

  • 请记住,批处理体系结构通常会影响在线体系结构,反之亦然。在可能的情况下,请使用通用的构建块同时考虑架构和环境进行设计。

  • 尽可能简化并避免在单个批处理应用程序中构建复杂的逻辑结构。

  • 将数据的处理和存储在物理上保持紧密联系(换句话说,将数据保存在发生处理的地方)。

  • 最小化系统资源的使用,尤其是I / O。在内存中执行尽可能多的操作。

  • 查看应用程序I / O(分析SQL语句)以确保避免不必要的物理I / O。特别是,需要寻找以下四个常见缺陷:

    • 当可以一次读取数据并将其缓存或保存在工作存储器中时,为每个事务读取数据。

    • 重新读取事务的数据,其中早先在同一事务中读取了数据。

    • 导致不必要的表或索引扫描。

    • 没有在SQL语句的WHERE子句中指定键值。

  • 不要在批处理中两次执行操作。例如,如果出于报告目的需要数据汇总,则应该(如果可能)在最初处理数据时增加存储的总数,因此报告应用程序不必重新处理相同的数据。

  • 在批处理应用程序开始时分配足够的内存,以避免在此过程中耗时的重新分配。

  • 关于数据完整性,请始终假设最坏的情况。插入足够的检查并记录验证以维护数据完整性。

  • 实施校验和以进行内部验证。例如,平面文件应具有预告片记录,以告知文件中的记录总数以及关键字段的集合。

  • 在具有实际数据量的类似生产的环境中,尽早计划和执行压力测试。

  • 在大型批处理系统中,备份可能会很困难,尤其是如果系统以24-7的方式在线联机运行时。在线设计中通常会很好地照顾数据库备份,但是文件备份也应同样重要。如果系统依赖平面文件,则不仅应建立文件备份程序并形成文件,还应进行定期测试。

1.5.批处理策略

为了帮助设计和实现批处理系统,应以示例结构图和代码外壳的形式向设计人员和程序员提供基本的批处理应用程序构建模块和模式。在开始设计批处理作业时,应将业务逻辑分解为一系列步骤,这些步骤可以使用以下标准构件来实现:

  • 转换应用程序:对于由外部系统提供或生成的每种文件类型,必须创建一个转换应用程序,以将提供的交易记录转换为处理所需的标准格式。这种批处理应用程序可以部分或全部由翻译实用程序模块组成(请参阅基本批处理服务)。

  • 验证应用程序:验证应用程序可确保所有输入/输出记录正确且一致。验证通常基于文件头和尾标,校验和和验证算法以及记录级别的交叉检查。

  • 提取应用程序:一种应用程序,它从数据库或输入文件中读取一组记录,根据预定义的规则选择记录,然后将记录写入输出文件中。

  • 提取/更新应用程序:一种应用程序,它从数据库或输入文件中读取记录,并根据每个输入记录中找到的数据来驱动对数据库或输出文件的更改。

  • 处理和更新应用程序:对提取或验证应用程序中的输入事务执行处理的应用程序。该处理通常涉及读取数据库以获得处理所需的数据,可能会更新数据库并创建记录以进行输出处理。

  • 输出/格式应用程序:读取输入文件,根据标准格式从该记录重组数据并生成输出文件以打印或传输到另一个程序或系统的应用程序。

此外,应为无法使用前面提到的构建块构建的业务逻辑提供基本的应用程序外壳。

除主要构建块外,每个应用程序还可以使用一个或多个标准实用程序步骤,例如:

  • 排序:一种程序,该程序读取输入文件并生成输出文件,其中已根据记录中的排序关键字字段对记录进行了重新排序。排序通常由标准系统实用程序执行。

  • 拆分:一种程序,该程序读取一个输入文件,并根据字段值将每个记录写入几个输出文件之一。拆分可以由参数驱动的标准系统实用程序定制或执行。

  • 合并:一种程序,可从多个输入文件中读取记录,并使用输入文件中的合并数据生成一个输出文件。可以通过参数驱动的标准系统实用程序来定制或执行合并。

批处理应用程序还可以按其输入源进行分类:

  • 数据库驱动的应用程序由从数据库检索的行或值驱动。

  • 文件驱动的应用程序由从文件中检索的记录或值驱动。

  • 消息驱动的应用程序由从消息队列检索的消息驱动。

任何批处理系统的基础都是处理策略。影响策略选择的因素包括:估计的批处理系统数量,与在线系统或其他批处理系统的并发性,可用的批处理窗口。(请注意,随着越来越多的企业希望24x7全天候运行,清晰的批处理窗口正在消失)。

批处理的典型处理选项是(按实现复杂度的升序排列):

  • 脱机模式下批处理窗口中的正常处理。

  • 并发批处理或联机处理。

  • 同时并行处理许多不同的批处理运行或作业。

  • 分区(在同一时间处理同一作业的许多实例)。

  • 前述选项的组合。

商业调度程序可能会支持其中一些或全部选项。

下一节将更详细地讨论这些处理选项。重要的是要注意,根据经验,批处理过程采用的提交和锁定策略取决于所执行的处理类型,并且在线锁定策略也应使用相同的原理。因此,在设计总体架构时,批处理架构不能只是简单的事后思考。

锁定策略可以是仅使用普通数据库锁定,也可以在体系结构中实施其他自定义锁定服务。锁定服务将跟踪数据库锁定(例如,通过将必要的信息存储在专用的db表中),并向请求db操作的应用程序授予或拒绝权限。此体系结构也可以实现重试逻辑,以避免在锁定情况下中止批处理作业。

1.批处理窗口中的常规处理对于在单独的批处理窗口中运行的简单批处理过程,在线用户或其他批处理过程不需要更新数据,并发不是问题,可以在站点上进行一次提交。批处理运行结束。

在大多数情况下,更健壮的方法更为合适。请记住,批处理系统在复杂性和处理的数据量方面都有随时间增长的趋势。如果没有锁定策略,并且系统仍依赖单个提交点,则修改批处理程序可能会很麻烦。因此,即使使用最简单的批处理系统,也要考虑对重新启动-恢复选项的提交逻辑的需求,以及有关本节稍后部分介绍的更复杂情况的信息。

2.并行批处理或联机处理可以由联机用户同时更新的批处理应用程序处理数据时,不应锁定联机用户可能需要超过200天的任何数据(数据库或文件中的数据)。几秒钟。另外,每隔几笔交易结束时,更新应提交给数据库。这样可以将其他进程不可用的数据部分和数据不可用的经过时间最小化。

最小化物理锁定的另一种选择是使用乐观锁定模式或悲观锁定模式来实现逻辑行级锁定。

  • 乐观锁定假定记录争用的可能性很小。通常,这意味着在批处理和联机处理同时使用的每个数据库表中插入一个时间戳列。当应用程序获取一行进行处理时,它还将获取时间戳。然后,当应用程序尝试更新已处理的行时,更新将使用WHERE子句中的原始时间戳。如果时间戳匹配,则更新数据和时间戳。如果时间戳不匹配,则表明另一个应用程序已在获取和更新尝试之间更新了同一行。因此,无法执行更新。

  • 悲观锁定是任何假定记录争用可能性很高的锁定策略,因此需要在检索时获得物理或逻辑锁定。一种悲观逻辑锁定使用数据库表中的专用锁定列。当应用程序检索要更新的行时,它将在锁列中设置一个标志。有了该标志,其他尝试检索同一行的应用程序在逻辑上将失败。当设置标志的应用程序更新该行时,它还会清除该标志,从而使该行可以被其他应用程序检索。请注意,在初始提取和设置标志之间还必须保持数据的完整性,例如通过使用db锁(例如SELECT FOR UPDATE)。还要注意,此方法与物理锁定具有相同的缺点,除了管理建立超时机制(如果用户在锁定记录的同时吃午饭时释放锁定)更容易管理之外。

这些模式不一定适用于批处理,但是它们可用于并发批处理和联机处理(例如在数据库不支持行级锁定的情况下)。通常,乐观锁定更适合于在线应用程序,而悲观锁定更适合于批处理应用程序。每当使用逻辑锁定时,必须对访问逻辑锁定保护的数据实体的所有应用程序使用相同的方案。

请注意,这两种解决方案都只解决锁定单个记录的问题。通常,我们可能需要锁定逻辑上相关的记录组。使用物理锁,您必须非常仔细地管理这些锁,以避免潜在的死锁。使用逻辑锁,通常最好构建一个逻辑锁管理器,该管理器了解您要保护的逻辑记录组,并可以确保锁是连贯的和非死锁的。此逻辑锁管理器通常使用自己的表进行锁管理,争用报告,超时机制和其他问题。

3.并行处理并行处理允许并行运行多个批处理运行或作业,以最大程度地减少总的批处理处理时间。只要作业不共享相同的文件,数据库表或索引空间,就没有问题。如果这样做,则应使用分区数据来实现此服务。另一种选择是通过使用控制表来构建用于维护相互依赖性的体系结构模块。控制表应为每个共享资源及其是否由应用程序使用而包含一行。然后,批处理体系结构或并行作业中的应用程序将从该表中检索信息,以确定它是否可以访问所需的资源。

如果数据访问没有问题,则可以通过使用其他线程进行并行处理来实现并行处理。在大型机环境中,传统上使用并行作业类,以确保所有进程有足够的CPU时间。无论如何,该解决方案必须足够强大,以确保所有正在运行的进程的时间片。

并行处理中的其他关键问题包括负载平衡和常规系统资源(例如文件,数据库缓冲池等)的可用性。还要注意,控制表本身很容易成为关键资源。

4.分区使用分区允许大型批处理应用程序的多个版本同时运行。这样做的目的是减少处理长时间批处理作业所需的时间。可以成功分区的进程是可以拆分输入文件和/或对主数据库表进行分区以允许应用程序针对不同的数据集运行的进程。

另外,必须将分区的进程设计为仅处理其分配的数据集。分区体系结构必须与数据库设计和数据库分区策略紧密联系在一起。请注意,数据库分区不一定意味着数据库的物理分区,尽管在大多数情况下这是可取的。下图说明了分区方法:

图1.2:分区过程
图2.分区过程

该架构应足够灵活,以允许动态配置分区数量。自动配置和用户控制配置均应考虑。自动配置可以基于诸如输入文件大小和输入记录数之类的参数。

4.1分区方法必须根据具体情况选择分区方法。下面的列表描述了一些可能的分区方法:

1.记录集的固定和均匀分解

这涉及将输入记录集分成偶数个部分(例如10个,其​​中每个部分恰好占整个记录集的1/10)。然后由批处理/提取应用程序的一个实例处理每个部分。

为了使用此方法,需要进行预处理以拆分记录集。拆分的结果将是一个上下限放置数,可以将其用作批处理/提取应用程序的输入,以便将其处理仅限于其部分。

预处理可能会产生很大的开销,因为它必须计算并确定记录集每个部分的界限。

2.按关键字列进行分解

这涉及通过键列(例如位置代码)分解输入记录集,并将数据从每个键分配给批处理实例。为了实现这一点,列值可以是:

  • 由分区表分配给批处理实例(在本节后面介绍)。

  • 通过一部分值(例如0000-0999、1000-1999等)分配给批处理实例。

在选项1下,添加新值意味着手动重新配置批处理/提取,以确保将新值添加到特定实例。

在选项2下,这确保通过批处理作业的实例覆盖所有值。但是,由一个实例处理的值的数量取决于列值的分布(在0000-0999范围内可能有很多位置,而在1000-1999范围内则很少)。在此选项下,数据范围的设计应考虑分区。

在这两种选择下,都无法实现记录到批处理实例的最佳均匀分配。没有动态配置所使用的批处理实例的数量。

3.按视图分解

这种方法基本上是按键列拆分的,但是在数据库级别。它涉及将记录集分解为视图。批处理应用程序的每个实例在处理过程中都会使用这些视图。分解是通过对数据进行分组来完成的。

使用此选项,必须将批处理应用程序的每个实例配置为命中特定视图(而不是主表)。同样,随着新数据值的添加,该新数据组必须包含在视图中。没有动态配置功能,因为实例数量的更改会导致视图的更改。

4.增加加工指标

这涉及在输入表中添加新列,该列用作指示符。作为预处理步骤,所有指标都标记为未处理。在批处理应用程序的记录获取阶段,将以该记录被标记为未处理的条件来读取记录,并且一旦读取(带锁)它们便被标记为正在处理。该记录完成后,指示符将更新为完成或错误。批处理应用程序的许多实例无需更改即可启动,因为附加列可确保记录仅被处理一次。按照“完成时,指标被标记为完成”的顺序排列一两句话。)

使用此选项,表上的I / O会动态增加。在更新批处理应用程序的情况下,由于必须进行写操作,因此减少了这种影响。

5.将表提取到平面文件

这涉及将表提取到文件中。然后可以将此文件分为多个段,并用作批处理实例的输入。

使用此选项,将表提取到文件中并将其拆分的额外开销可能会抵消多分区的影响。通过更改文件分割脚本可以实现动态配置。

6.哈希列的使用

该方案涉及在用于检索驱动程序记录的数据库表中添加哈希列(键/索引)。该哈希列具有指示符,用于确定批处理应用程序的哪个实例处理该特定行。例如,如果要启动三个批处理实例,则指示符“ A”标记为要由实例1处理的行,指示符“ B”标记为要按实例2处理的行,指示符为“ C” '标记一行以供实例3处理。

然后,用于检索记录的过程将具有一个附加WHERE子句,以选择由特定指示符标记的所有行。该表中的插入内容将涉及添加标记字段,该字段默认为实例之一(例如“ A”)。

一个简单的批处理应用程序将用于更新指标,例如在不同实例之间重新分配负载。添加足够多的新行后,可以运行该批处理(除批处理窗口外,随时可以)将新行重新分配给其他实例。

批处理应用程序的其他实例仅需要运行如前几段所述的批处理应用程序,即可重新分配指示符以与新数量的实例一起使用。

4.2数据库和应用程序设计原则

支持使用键列方法针对分区数据库表运行的多分区应用程序的体系结构应包括用于存储分区参数的中央分区存储库。这提供了灵活性并确保了可维护性。该存储库通常由一个表(称为分区表)组成。

分区表中存储的信息是静态的,通常应由DBA维护。该表应包含多分区应用程序每个分区的一行信息。该表应包含“程序ID代码”,“分区号”(分区的逻辑ID),此分区的db键列的“低”值和此分区的db键列的“高”列。

在程序启动时,id应将程序和分区号从体系结构(特别是从“控制处理任务”)传递给应用程序。如果使用键列方法,则这些变量用于读取分区表,以确定应用程序要处理的数据范围。此外,在整个处理过程中必须使用分区号,以便:

  • 添加到输出文件/数据库更新中以使合并过程正常运行。

  • 将正常处理报告给批处理日志,并将任何错误报告给体系结构错误处理程序。

4.3最小化死锁

当应用程序并行运行或分区时,数据库资源中的争用和死锁可能发生。至关重要的是,数据库设计团队应尽可能消除潜在的争用情况,这是数据库设计的一部分。

而且,开发人员必须确保在设计数据库索引表时要牢记防止死锁和性能。

死锁或热点通常发生在管理表或体系结构表中,例如日志表,控制表和锁定表。还应考虑这些含义。实际的压力测试对于确定体系结构中的可能瓶颈至关重要。

为了最大程度地减少冲突对数据的影响,体系结构应在连接到数据库或遇到死锁时提供诸如重试间隔等服务。这意味着内置机制可以对某些数据库返回码做出反应,而不是发出立即错误,而是等待预定时间并重试数据库操作。

4.4参数传递和验证

分区体系结构对于应用程序开发人员应该相对透明。该体系结构应执行与在分区模式下运行应用程序相关的所有任务,包括:

  • 在应用程序启动之前检索分区参数。

  • 在应用程序启动之前验证分区参数。

  • 在启动时将参数传递给应用程序。

验证应包括检查以确保:

  • 该应用程序具有足够的分区来覆盖整个数据范围。

  • 分区之间没有间隙。

如果数据库已分区,则可能需要进行一些其他验证,以确保单个分区不会跨越数据库分区。

同样,该体系结构应考虑分区的合并。关键问题包括:

  • 在进入下一个作业步骤之前,是否必须完成所有分区?

  • 如果其中一个分区中止会怎样?

2. Spring Batch 4.2的新增功能

Spring Batch 4.2增加了以下功能:

2.1.千分尺的批次指标

此版本引入了一项新功能,使您可以使用测微计来监视批处理作业。默认情况下,Spring Batch收集指标(例如作业持续时间,步骤持续时间,项目读写吞吐量等),并在spring.batch前缀下的Micrometer全局指标注册表中注册它们这些度量可以发送到 Micrometer支持的任何监视系统

有关此功能的更多详细信息,请参阅“ 监视和指标”一章。

2.2.Apache Kafka项目读取器/写入器

这个版本增加了一个新的KafkaItemReaderKafkaItemWriter读取数据并将其写入Kafka主题。有关这些新组件的更多详细信息,请参考Javadoc

2.3.Apache Avro项目读取器/写入器

此版本增加了一个新功能AvroItemReaderAvroItemWriter可以从Avro资源中读取数据并将其写入其中。有关这些新组件的更多详细信息,请参考Javadoc

2.4.文档更新

参考文档已更新,以匹配与其他Spring项目相同的样式。

3.批处理的域语言

对于任何经验丰富的批处理设计师而言,Spring Batch中使用的批处理的总体概念应该是熟悉且舒适的。有“工作”和“步骤”,并要求开发人员提供处理单元ItemReaderItemWriter但是,由于存在Spring模式,操作,模板,回调和惯用语,因此有以下机会:

  • 遵守关注点明显分开的情况得到了显着改善。

  • 清晰地描述了作为接口提供的体系结构层和服务。

  • 简单和默认的实现方式,可以快速采用,开箱即用。

  • 显着增强的可扩展性。

下图是已使用了数十年的批处理参考体系结构的简化版本。它概述了组成批处理域语言的组件。该体系结构框架是一个蓝图,已经在最后几代平台(COBOL / Mainframe,C / Unix,现在是Java /任何地方)上数十年的实现中得到了证明。JCL和COBOL开发人员可能会像C,C#和Java开发人员一样熟悉这些概念。Spring Batch提供了层,组件和技术服务的物理实现,这些层,组件和技术服务通常在健壮,可维护的系统中找到,这些系统用于解决从简单到复杂的批处理应用程序的创建,其基础结构和扩展可以满足非常复杂的处理需求。

图2.1:批处理原型
图3.批处理原型

上图突出显示了构成Spring Batch领域语言的关键概念。作业有一个到多个步骤,每个步骤都只有一个ItemReader,一个ItemProcessor和一个步骤ItemWriter需要启动一个作业(带有 JobLauncher),并且需要存储有关当前正在运行的进程的元数据(位于中 JobRepository)。

3.1.Job

本节描述与批处理作业的概念有关的构造型。A Job是封装整个批处理过程的实体。与其他Spring项目一样,a Job与XML配置文件或基于Java的配置连接在一起。该配置可以被称为“作业配置”。但是, Job这只是整个层次结构的顶部,如下图所示:

工作层次
图4.作业层次结构

在Spring Batch中,a Job只是Step实例的容器它组合了逻辑上属于流程的多个步骤,并允许配置所有步骤全局的属性,例如可重新启动性。作业配置包含:

  • 作业的简单名称。

  • Step实例的定义和顺序

  • 作业是否可重新启动。

Spring Batch以SimpleJob的形式提供Job接口的默认简单实现,该实现在之上创建了一些标准功能Job使用基于Java的配置时,可使用一组构建器来实例化a Job,如以下示例所示:

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .end()
                     .build();
}

3.1.1.JobInstance

A JobInstance是指逻辑作业运行的概念。考虑一个应该在一天结束时运行一次的批处理作业,例如Job上图中的“ EndOfDay” 有一个“ EndOfDay”作业,但是Job必须单独跟踪每个运行在这项工作中,JobInstance每天只有一个逻辑例如,有1月1日运行,1月2日运行,依此类推。如果1月1日运行第一次失败并在第二天再次运行,则仍是1月1日运行。(通常,这也与它正在处理的数据相对应,这意味着1月1日运行处理1月1日的数据)。因此,每个都JobInstance可以有多个执行(JobExecution本章稍后将详细讨论),并且只有一个JobInstance与特定内容相对应JobJobParameters可以在给定时间运行。

a的定义JobInstance绝对与要加载的数据无关。完全取决于ItemReader实现来确定如何加载数据。例如,在EndOfDay方案中,数据上可能有一列指示该数据所属的“生效日期”或“计划日期”。因此,1月1日的运行将仅加载第1次的数据,而1月2日的运行将仅使用第2次的数据。由于此确定可能是一项业务决策,因此由 ItemReader决定。但是,使用同一个参数JobInstance可以确定是否使用ExecutionContext先前执行中的“状态”(即本章稍后讨论的)。使用新的JobInstance 表示“从头开始”,而使用现有实例通常表示“从上次中断的地方开始”。

3.1.2.作业参数

在讨论JobInstance了它与Job的不同之处之后,自然要问的问题是:“一个人JobInstance与另一个人有什么区别?” 答案是: JobParameters一个JobParameters对象拥有一组用于启动批处理作业的参数。它们可以在运行期间用于标识甚至用作参考数据,如下图所示:

工作参数
图5.作业参数

在前面的示例中,有两个实例,一个实例是1月1日,另一个实例是1月2日,实际上只有一个实例,Job但是它有两个JobParameter对象:一个对象的作业参数为01-01-2017,另一个为对象它以01-02-2017参数开始。因此,合同可以定义为:JobInstance= Job +标识JobParameters这使开发人员可以有效地控制a JobInstance的定义方式,因为他们可以控制传入的参数。

并非所有作业参数都需要有助于识别 JobInstance默认情况下,它们会这样做。但是,该框架还允许提交Job带有对a 的身份无贡献的参数的a JobInstance

3.1.3.工作执行

A JobExecution是指一次尝试运行Job的技术概念。执行可能以失败或成功结束,但是JobInstance与给定执行相对应的执行除非成功完成,否则不视为完成。Job前面所述的EndOfDay 为例,考虑JobInstance2017年1月1日的首次运行失败。如果使用与第一次运行(01-01-2017)相同的标识作业参数再次运行,JobExecution则会创建一个新的。但是,仍然只有一个JobInstance

A Job定义什么是作业及其执行方式,a JobInstance是将执行组合在一起的纯粹的组织对象,主要是为了启用正确的重新启动语义。JobExecution但是,A 是运行期间实际发生情况的主要存储机制,它包含许多必须控制和持久化的属性,如下表所示:

表1. JobExecution属性

属性

定义

状态

BatchStatus对象,指示执行的状态。在运行时,它是 BatchStatus#STARTED如果失败,则为BatchStatus#FAILED如果成功完成,那就是BatchStatus#COMPLETED

开始时间

一个java.util.Date代表当执行开始时的当前系统时间。如果作业尚未开始,则此字段为空。

时间结束

一个java.util.Date代表当执行完成后,无论它是否是成功的当前系统时间。如果作业尚未完成,则该字段为空。

退出状态

ExitStatus,说明运行的结果。这是最重要的,因为它包含返回给调用方的退出代码。有关更多详细信息,请参见第5章。如果作业尚未完成,则该字段为空。

createTime

java.util.Date表示当当前系统时间JobExecution最早持续。作业可能尚未启动(因此没有启动时间),但是它始终具有createTime,这是管理作业级别的框架所需的 ExecutionContexts

最近更新时间

java.util.Date代表上一次JobExecution持续存在的A。如果作业尚未开始,则此字段为空。

executionContext

“属性包”包含两次执行之间需要保留的所有用户数据。

failureExceptions

执行Job。时遇到的异常列表如果在失败时遇到多个异常,这些功能将非常有用Job

这些属性很重要,因为它们可以持久保存并且可以用来完全确定执行状态。例如,如果01-01的EndOfDay作业在9:00 PM执行而在9:30失败,则在批处理元数据表中进行以下输入:

表2. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

表3. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_ID

TYPE_CD

KEY_NAME

DATE_VAL

识别

1

日期

schedule.Date

2017-01-01

真正

表4. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

开始时间

时间结束

状态

1

1

2017-01-01 21:00

2017-01-01 21:30

失败

为了清楚和格式化,列名可能已被缩写或删除。

现在工作失败了,假设确定问题已花费了一整夜,因此“批处理窗口”现在关闭了。进一步假设该窗口在9:00 PM开始,则该作业将在01-01再次开始,从停止的地方开始,并在9:30成功完成。因为现在是第二天,所以也必须运行01-02作业,此作业随后才在9:31开始,并在正常的一小时时间内在10:30完成。并不需要一个接一个JobInstance地启动,除非两个作业有可能尝试访问相同的数据,从而导致在数据库级别锁定的问题。完全由调度程序确定何时Job应运行a。由于它们是分开的JobInstances,Spring Batch不会尝试阻止它们同时运行。(尝试JobInstance在另一个已经运行的情况下运行相同的结果 JobExecutionAlreadyRunningException会抛出该错误)。现在,JobInstanceJobParameters中都应该有一个额外的条目,并且表中应该有两个额外的条目, JobExecution如下表所示:

表5. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

2

EndOfDayJob

表6. BATCH_JOB_EXECUTION_PARAMS

JOB_EXECUTION_ID

TYPE_CD

KEY_NAME

DATE_VAL

识别

1

日期

schedule.Date

2017-01-01 00:00:00

真正

2

日期

schedule.Date

2017-01-01 00:00:00

真正

3

日期

schedule.Date

2017-01-02 00:00:00

真正

表7. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

开始时间

时间结束

状态

1

1

2017-01-01 21:00

2017-01-01 21:30

失败

2

1

2017-01-02 21:00

2017-01-02 21:30

已完成

3

2

2017-01-02 21:31

2017-01-02 22:29

已完成

为了清楚和格式化,列名可能已被缩写或删除。

3.2.

A Step是一个域对象,封装了批处理作业的一个独立的顺序阶段。因此,每个工作完全由一个或多个步骤组成。一个Step包含了所有的定义和控制实际的批量处理所需的信息。这是一个模糊的描述,因为任何给定的内容Step都是由开发人员决定编写的JobA Step可以根据开发人员的需求简单或复杂。一个简单的方法Step可能会将文件中的数据加载到数据库中,几乎不需要代码(取决于所使用的实现)。较复杂的 Step业务规则可能包含复杂的业务规则,这些规则将在处理过程中应用。与a一样Job,a Step有一个个体StepExecution与unique相关联 JobExecution,如下图所示:

图2.1:带步骤的作业层次结构
图6.带步骤的作业层次结构

3.2.1.步骤执行

A StepExecution代表执行的单次尝试StepStepExecution 每次Step运行a都会创建一个新内容,类似于JobExecution但是,如果某个步骤由于执行失败而无法执行,则不会继续执行。A StepExecution仅在其Step实际启动时创建

Step执行由StepExecution的对象表示每个执行都包含对其相应步骤和JobExecution与事务相关的数据的引用,例如提交和回滚计数以及开始和结束时间。此外,每个步骤执行都包含一个ExecutionContext,其中包含开发人员在批处理运行中需要保留的所有数据,例如重新启动所需的统计信息或状态信息。下表列出了的属性StepExecution

表8. StepExecution属性

属性

定义

状态

BatchStatus对象,指示执行的状态。在运行时,状态为BatchStatus.STARTED如果失败,则状态为BatchStatus.FAILED如果成功完成,则状态为BatchStatus.COMPLETED

开始时间

一个java.util.Date代表当执行开始时的当前系统时间。如果步骤尚未开始,则此字段为空。

时间结束

一个java.util.Date代表当执行完成后,无论它是否是成功的当前系统时间。如果步骤尚未退出,则此字段为空。

退出状态

ExitStatus指示执行的结果。这是最重要的,因为它包含返回给调用方的退出代码。有关更多详细信息,请参见第5章。如果作业尚未退出,则此字段为空。

executionContext

“属性包”包含两次执行之间需要保留的所有用户数据。

读取计数

已成功读取的项目数。

writeCount

已成功写入的项目数。

commitCount

为此执行已提交的事务数。

rollbackCount

由所控制的业务交易Step已回滚的次数。

readSkipCount

read失败次数导致项目被跳过。

processSkipCount

process失败次数导致项目被跳过。

filterCount

已被“过滤”的项目数ItemProcessor

writeSkipCount

write失败次数导致项目被跳过。

3.3.执行上下文

An ExecutionContext表示键/值对的集合,这些键/值对由框架进行持久化和控制,以便允许开发人员放置一个存储范围为StepExecution对象或JobExecution对象的持久状态的位置对于熟悉Quartz的人来说,它与JobDataMap非常相似。最佳用法示例是促进重新启动。以平面文件输入为例,在处理单独的行时,框架会定期保留ExecutionContext提交点。这样做可以ItemReader在运行期间发生致命错误或断电的情况下存储其状态。所需要做的就是将当前读取的行数放入上下文中,如下面的示例所示,框架将完成其余工作:

executionContext.putLong(getKey(LINES_READ_COUNT), reader.getPosition());

JobStereotypes部分的EndOfDay示例为例,假设有一个步骤“ loadData”将文件加载到数据库中。第一次失败运行后,元数据表将类似于以下示例:

表9. BATCH_JOB_INSTANCE

JOB_INST_ID

JOB_NAME

1

EndOfDayJob

表10. BATCH_JOB_EXECUTION_PARAMS

JOB_INST_ID

TYPE_CD

KEY_NAME

DATE_VAL

1

日期

schedule.Date

2017-01-01

表11. BATCH_JOB_EXECUTION

JOB_EXEC_ID

JOB_INST_ID

开始时间

时间结束

状态

1

1

2017-01-01 21:00

2017-01-01 21:30

失败

表12. BATCH_STEP_EXECUTION

STEP_EXEC_ID

JOB_EXEC_ID

STEP_NAME

开始时间

时间结束

状态

1

1

loadData

2017-01-01 21:00

2017-01-01 21:30

失败

表13. BATCH_STEP_EXECUTION_CONTEXT

STEP_EXEC_ID

SHORT_CONTEXT

1

{piece.count = 40321}

在上述情况下,Step运行了30分钟并处理了40321个“件”,在这种情况下,这代表了文件中的行。此值会在框架每次提交之前更新,并且可以包含与中的条目相对应的多行 ExecutionContext在提交之前被通知需要各种 StepListener实现之一(或ItemStream),本指南后面将对此进行详细讨论。与前面的示例一样,假定Job于第二天重新启动。重新启动后,ExecutionContext将从数据库中重新构建上次运行的值ItemReader被打开时,它可以检查以查看它是否具有在上下文任何存储的状态,并从那里初始化自身,如图以下示例:

if (executionContext.containsKey(getKey(LINES_READ_COUNT))) {
    log.debug("Initializing for restart. Restart data is: " + executionContext);

    long lineCount = executionContext.getLong(getKey(LINES_READ_COUNT));

    LineReader reader = getReader();

    Object record = "";
    while (reader.getPosition() < lineCount && record != null) {
        record = readLine();
    }
}

在这种情况下,运行上述代码后,当前行为40322,从而允许Step 再次从中断处开始。ExecutionContext也可用于那些需要被保留的关于运行本身的统计数据。例如,如果一个平面文件包含跨多行存在的处理订单,则可能有必要存储已处理的订单数量(与读取的行数有很大不同),以便可以通过以下方式发送电子邮件:结束Step于正文中处理的订单总数。框架会为开发人员处理存储的内容,以便将其正确地分配给个人JobInstance很难知道是否存在ExecutionContext是否应该使用。例如,使用上面的“ EndOfDay”示例,当01-01运行再次第二次开始时,框架会识别出相同JobInstance并且是单独的Step,将其ExecutionContext拉出数据库,并将其移交给数据库(作为自身的一部分 StepExecutionStep相反,对于01-02运行,框架识别出它是一个不同的实例,因此必须将空上下文传递给 Step框架为开发人员做出了许多类型的确定,以确保在正确的时间将状态提供给开发人员。同样重要的是要注意,在任何给定时间都ExecutionContext存在一个StepExecution客户ExecutionContext应该小心,因为这会创建一个共享的键空间。因此,在输入值时应注意确保没有数据被覆盖。但是,Step存储绝对不会在上下文中存储任何数据,因此没有办法对框架产生不利影响。

同样重要的是要注意,至少有一个ExecutionContextJobExecution一个用于每一个StepExecution例如,考虑以下代码片段:

ExecutionContext ecStep = stepExecution.getExecutionContext();
ExecutionContext ecJob = jobExecution.getExecutionContext();
//ecStep does not equal ecJob

如评论中所述,ecStep不等于ecJob他们是两个不同的人 ExecutionContexts范围为的Step一个保存在中的每个提交点 Step,而范围为的一个保存在每次Step执行之间

3.4.JobRepository

JobRepository是上述所有构造型的持久性机制。它提供了CRUD操作JobLauncherJob以及Step实现。Job第一次启动,一个JobExecution被从库中获得,并且,执行的过程中,StepExecutionJobExecution实施方式是通过将它们传递到存储库持续。

使用Java配置时,@EnableBatchProcessing注释提供了a JobRepository作为开箱即用自动配置的组件之一。

3.5.JobLauncher

JobLauncher代表一个简单的界面,用于Job使用的给定集合 启动JobParameters,如以下示例所示:

public interface JobLauncher {

public JobExecution run(Job job, JobParameters jobParameters)
            throws JobExecutionAlreadyRunningException, JobRestartException,
                   JobInstanceAlreadyCompleteException, JobParametersInvalidException;
}

预计实现获得有效JobExecution距离 JobRepository和执行Job

3.6.物品阅读器

ItemReader是一种抽象,表示一次检索一项的输入StepItemReader用尽了它可以提供的物品时,它通过返回来表明这一点null有关ItemReader接口及其各种实现的更多详细信息,请参见 读者和作家

3.7.项目作家

ItemWriter是一个抽象,一次代表一个Step,一批或大块项目的输出通常,一个ItemWriter人不知道接下来应该接收的输入,并且只知道当前调用中传递的项目。有关ItemWriter接口及其各种实现的更多详细信息,请参见 读者和作家

3.8.项目处理器

ItemProcessor是表示项目的业务处理的抽象。ItemReader读取一项并将其ItemWriter写入的同时,它们 ItemProcessor提供了一个访问点来转换或应用其他业务处理。如果在处理项目时确定该项目无效,则返回 null指示不应将该项目写出。有关该ItemProcessor接口的更多详细信息,请 参见 读者和作家

4.配置和运行作业

在“ 领域”部分中,使用下图作为指南讨论了总体体系结构设计:

图2.1:批处理原型
图7.批处理原型

尽管Job对象看起来像是简单的步骤容器,但开发人员必须知道许多配置选项。此外,对于如何Job运行a 及其在运行期间如何存储其元数据,有许多考虑因素本章将解释.NET的各种配置选项和运行时问题Job

4.1.配置作业

Job接口有多种实现,但是构建器可以消除配置上的差异。

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .start(playerLoad())
                     .next(gameLoad())
                     .next(playerSummarization())
                     .end()
                     .build();
}

Job(并且典型地任何Step在其内)需要一个JobRepository的配置JobRepository通过进行处理BatchConfigurer

上面的示例说明了Job由三个Step实例组成的与工作相关的构建器还可以包含有助于并行化(Split),声明性流控制(Decision)和流定义的外部化()的其他元素Flow

4.1.1.可重启性

执行批处理作业时的一个关键问题与Job重新启动时的行为有关的启动 Job被认为是一个“重新启动”,如果 JobExecution已经存在特定的 JobInstance理想情况下,所有作业都应该能够从中断的地方开始,但是在某些情况下这是不可能的。开发人员完全有责任确保JobInstance在这种情况下创建一个新文件但是,Spring Batch确实提供了一些帮助。如果a Job绝不应该重新启动,而应始终作为new的一部分运行JobInstance,则可重新启动属性可以设置为'false':

Java配置
@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .preventRestart()
                     ...
                     .build();
}

换句话说,将restartable设置为false意味着“这 Job不支持再次启动”。重新启动Job无法重新启动的JobRestartException,将引发:

Job job = new SimpleJob();
job.setRestartable(false);

JobParameters jobParameters = new JobParameters();

JobExecution firstExecution = jobRepository.createJobExecution(job, jobParameters);
jobRepository.saveOrUpdate(firstExecution);

try {
    jobRepository.createJobExecution(job, jobParameters);
    fail();
}
catch (JobRestartException e) {
    // expected
}

此JUnit代码片段显示了尝试JobExecution为不可重新启动的作业首次创建 不会造成任何问题。但是,第二次尝试将抛出JobRestartException

4.1.2.拦截作业执行

在执行作业的过程中,通知其生命周期中的各种事件可能很有用,以便可以执行自定义代码。SimpleJob允许通过调用一个 JobListener在适当的时候:

public interface JobExecutionListener {

    void beforeJob(JobExecution jobExecution);

    void afterJob(JobExecution jobExecution);

}

JobListeners可以SimpleJob通过作业上的listeners元素添加到中

Java配置
@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
                     .listener(sampleListener())
                     ...
                     .build();
}

应该注意的是,afterJob无论作业成功与否都会调用。如果需要确定成功或失败,可以从以下位置获得JobExecution

public void afterJob(JobExecution jobExecution){
    if( jobExecution.getStatus() == BatchStatus.COMPLETED ){
        //job success
    }
    else if(jobExecution.getStatus() == BatchStatus.FAILED){
        //job failure
    }
}

与此接口对应的注释为:

  • @BeforeJob

  • @AfterJob

4.1.4.JobParametersValidator

在XML名称空间中声明的作业或使用的任何子类 AbstractJob可以选择在运行时声明作业参数的验证器。例如,当您需要断言一个作业使用其所有必填参数启动时,此功能很有用。有一个 DefaultJobParametersValidator可用于约束简单的强制性和可选参数的组合,对于更复杂的约束,您可以自己实现接口。

通过Java构建器支持验证器的配置,例如:

@Bean
public Job job1() {
    return this.jobBuilderFactory.get("job1")
                     .validator(parametersValidator())
                     ...
                     .build();
}

4.2.Java配置

除了XML,Spring 3还提供了通过Java配置应用程序的功能。从Spring Batch 2.2.0开始,可以使用相同的Java配置来配置批处理作业。基于Java的配置有两个组件:@EnableBatchProcessing批注和两个构建器。

这些@EnableBatchProcessing作品与Spring系列中的其他@ Enable *注释相似。在这种情况下, @EnableBatchProcessing提供用于构建批处理作业的基本配置。在此基本配置中,StepScope除了提供许多可自动装配的bean之外还创建了的实例

  • JobRepository -bean名称“ jobRepository”

  • JobLauncher -豆子名称“ jobLauncher”

  • JobRegistry -bean名称“ jobRegistry”

  • PlatformTransactionManager -Bean名称“ transactionManager”

  • JobBuilderFactory -bean名称“ jobBuilders”

  • StepBuilderFactory -bean名称“ stepBuilders”

此配置的核心接口是BatchConfigurer默认实现提供了上述的Bean,并且需要DataSource在上下文中将作为Bean。JobRepository将使用此数据源。您可以通过创建BatchConfigurer接口的自定义实现来自定义这些bean中的任何一个通常,扩展DefaultBatchConfigurer(如果BatchConfigurer未找到a时提供 )并覆盖所需的吸气剂就足够了。但是,可能需要从头实施自己的方法。以下示例显示如何提供自定义事务管理器:

@Bean
public BatchConfigurer batchConfigurer() {
	return new DefaultBatchConfigurer() {
		@Override
		public PlatformTransactionManager getTransactionManager() {
			return new MyTransactionManager();
		}
	};
}

只有一个配置类需要具有 @EnableBatchProcessing注释。在为课程加上注释后,您将可以使用上述所有内容。

使用基本配置后,用户可以使用提供的构建器工厂来配置作业。以下是通过JobBuilderFactory配置的两步作业的示例 StepBuilderFactory

@Configuration
@EnableBatchProcessing
@Import(DataSourceConfiguration.class)
public class AppConfig {

    @Autowired
    private JobBuilderFactory jobs;

    @Autowired
    private StepBuilderFactory steps;

    @Bean
    public Job job(@Qualifier("step1") Step step1, @Qualifier("step2") Step step2) {
        return jobs.get("myJob").start(step1).next(step2).build();
    }

    @Bean
    protected Step step1(ItemReader<Person> reader,
                         ItemProcessor<Person, Person> processor,
                         ItemWriter<Person> writer) {
        return steps.get("step1")
            .<Person, Person> chunk(10)
            .reader(reader)
            .processor(processor)
            .writer(writer)
            .build();
    }

    @Bean
    protected Step step2(Tasklet tasklet) {
        return steps.get("step2")
            .tasklet(tasklet)
            .build();
    }
}

4.3.配置JobRepository

使用时@EnableBatchProcessingJobRepository开箱即用为您提供a 本节介绍配置您自己的内容。

如前所述,JobRepository用于Spring Batch中各种持久化域对象(例如JobExecution和)的 基本CRUD操作 StepExecution它是由许多主要的框架功能要求,如JobLauncherJobStep

使用Java配置时,JobRepository会为您提供a 如果提供了a,DataSource则直接提供Map基于JDBC的JDBC 否则不提供基于JDBC的JDBC 但是,您可以JobRepository通过BatchConfigurer接口的实现来自定义通道的 配置。

Java配置
...
// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setIsolationLevelForCreate("ISOLATION_SERIALIZABLE");
    factory.setTablePrefix("BATCH_");
    factory.setMaxVarCharLength(1000);
    return factory.getObject();
}
...

除了dataSource和transactionManager外,不需要上面列出的配置选项。如果未设置,将使用上面显示的默认值。出于意识目的,它们在上面显示。varchar的最大长度默认为2500,这是示例架构脚本中VARCHAR的长度

4.3.1.JobRepository的事务配置

如果使用名称空间或提供的名称空间FactoryBean,则将在存储库周围自动创建事务建议。这是为了确保批元数据(包括故障后重新启动所必需的状态)得以正确保存。如果存储库方法不是事务性的,则框架的行为无法很好地定义。create*方法属性中的隔离级别是分别指定的,以确保启动作业时,如果两个进程试图同时启动同一作业,则只有一个成功。该方法的默认隔离级别为SERIALIZABLE,这非常激进:READ_COMMITTED也可以工作;如果两个进程不太可能以这种方式冲突,则READ_UNCOMMITTED会很好。但是,由于 create*该方法很短,只要数据库平台支持,SERIALIZED就不会引起问题。但是,可以重写:

Java配置
// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setIsolationLevelForCreate("ISOLATION_REPEATABLE_READ");
    return factory.getObject();
}

如果不使用名称空间或工厂Bean,那么使用AOP配置存储库的事务行为也很重要:

Java配置
@Bean
public TransactionProxyFactoryBean baseProxy() {
	TransactionProxyFactoryBean transactionProxyFactoryBean = new TransactionProxyFactoryBean();
	Properties transactionAttributes = new Properties();
	transactionAttributes.setProperty("*", "PROPAGATION_REQUIRED");
	transactionProxyFactoryBean.setTransactionAttributes(transactionAttributes);
	transactionProxyFactoryBean.setTarget(jobRepository());
	transactionProxyFactoryBean.setTransactionManager(transactionManager());
	return transactionProxyFactoryBean;
}

4.3.2.更改表前缀

的另一个可修改属性 JobRepository是元数据表的表前缀。默认情况下,它们都以BATCH_开头。BATCH_JOB_EXECUTION和BATCH_STEP_EXECUTION是两个示例。但是,存在修改此前缀的潜在原因。如果需要在表名之前添加模式名称,或者在同一模式中需要一组以上的元数据表,则需要更改表前缀:

Java配置
// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setTransactionManager(transactionManager);
    factory.setTablePrefix("SYSTEM.TEST_");
    return factory.getObject();
}

鉴于以上更改,对元数据表的每个查询都将以“ SYSTEM.TEST_”为前缀。BATCH_JOB_EXECUTION将被称为SYSTEM.TEST_JOB_EXECUTION。

仅表前缀是可配置的。表名和列名不是。

4.3.3.内存中的存储库

在某些情况下,您可能不想将域对象持久保存到数据库中。原因之一可能是速度。在每个提交点存储域对象会花费额外的时间。另一个原因可能是您不需要为特定工作保留状态。因此,Spring批处理提供了作业存储库的内存Map版本:

Java配置
// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
    MapJobRepositoryFactoryBean factory = new MapJobRepositoryFactoryBean();
    factory.setTransactionManager(transactionManager);
    return factory.getObject();
}

请注意,内存中的存储库是易失性的,因此不允许在JVM实例之间重新启动。它还不能保证两个具有相同参数的作业实例同时启动,并且不适合在多线程Job或本地分区中使用Step因此,只要需要这些功能,就可以使用存储库的数据库版本。

但是,它确实需要定义事务管理器,因为存储库中存在回滚语义,并且由于业务逻辑可能仍是事务性的(例如RDBMS访问)。出于测试目的,许多人发现它 ResourcelessTransactionManager很有用。

4.3.4.存储库中的非标准数据库类型

如果使用的数据库平台不在受支持的平台列表中,并且SQL变量足够接近,则可以使用一种受支持的类型。为此,您可以使用raw JobRepositoryFactoryBean而不是名称空间快捷方式,并使用它来将数据库类型设置为最接近的匹配项:

Java配置
// This would reside in your BatchConfigurer implementation
@Override
protected JobRepository createJobRepository() throws Exception {
    JobRepositoryFactoryBean factory = new JobRepositoryFactoryBean();
    factory.setDataSource(dataSource);
    factory.setDatabaseType("db2");
    factory.setTransactionManager(transactionManager);
    return factory.getObject();
}

如果未指定JobRepositoryFactoryBeanDataSource则会尝试从中自动检测数据库类型。)平台之间的主要差异主要是由增加主键的策略来解决的,因此通常可能也需要重写主键 incrementerFactory(使用一个Spring框架中的标准实现)。

如果甚至不起作用,或者您没有使用RDBMS,那么唯一的选择可能是实现DaoSimpleJobRepository依赖的各种接口,并以正常的Spring方式手动将其连接起来。

4.4.配置JobLauncher

使用时@EnableBatchProcessingJobRegistry开箱即用为您提供a 本节介绍配置您自己的内容。

JobLauncher接口的最基本实现 SimpleJobLauncher它唯一需要的依赖项是JobRepository,以获得执行:

Java配置
...
// This would reside in your BatchConfigurer implementation
@Override
protected JobLauncher createJobLauncher() throws Exception {
	SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
	jobLauncher.setJobRepository(jobRepository);
	jobLauncher.afterPropertiesSet();
	return jobLauncher;
}
...

一旦获得JobExecution,它将被传递给Job的execute方法,最终将其返回 JobExecution给调用者:

作业启动器序列
图8. Job Launcher序列

该序列很简单,从调度程序启动时效果很好。但是,尝试从HTTP请求启动时会出现问题。在这种情况下,启动需要异步完成,以便SimpleJobLauncher立即返回到其调用方。这是因为在长时间运行的进程(例如批处理)所需的时间内保持HTTP请求打开的时间不是一种好习惯。下面是一个示例序列:

异步作业启动器序列
图9.异步作业启动器序列

SimpleJobLauncher可以很容易地配置为允许这种情况下通过配置 TaskExecutor

Java配置
@Bean
public JobLauncher jobLauncher() {
	SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
	jobLauncher.setJobRepository(jobRepository());
	jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
	jobLauncher.afterPropertiesSet();
	return jobLauncher;
}

spring TaskExecutor 接口的任何实现都可以用来控制如何异步执行作业。

4.5.运行工作

至少,启动批处理作业需要两件事: Job要启动的和 JobLauncher两者都可以包含在相同或不同的上下文中。例如,如果从命令行启动作业,则将为每个Job实例化一个新的JVM,因此每个作业都有自己的JobLauncher但是,如果从范围内的Web容器中运行 HttpRequest,通常将JobLauncher配置一个 ,用于异步作业启动,多个请求将被调用以启动其作业。

4.5.1.从命令行运行作业

对于希望从企业计划程序运行其作业的用户,命令行是主要界面。这是因为大多数调度程序(除非是Quartz,除非使用NativeJob除外)都直接与操作系统进程配合使用,而这些进程主要是从Shell脚本开始的。除了Shell脚本(例如Perl,Ruby)甚至“构建工具”(例如ant或maven)之外,还有许多启动Java进程的方法。但是,由于大多数人都熟悉Shell脚本,因此本示例将重点介绍它们。

CommandLineJobRunner

因为启动作业的脚本必须启动Java虚拟机,所以需要一个具有main方法的类作为主要入口点。Spring Batch提供了一个实现此目的的实现: CommandLineJobRunner重要的是要注意,这只是引导应用程序的一种方法,但是有许多方法可以启动Java进程,并且绝对不应将此类视为权威。CommandLineJobRunner 执行四项任务:

  • 加载适当的 ApplicationContext

  • 将命令行参数解析为 JobParameters

  • 根据参数找到合适的工作

  • 使用JobLauncher应用程序上下文中提供的启动工作。

所有这些任务仅使用传入的参数即可完成。以下是必填参数:

表14. CommandLineJobRunner参数

jobPath

将用于创建XML文件的XML文件的位置ApplicationContext该文件应包含运行完整作业所需的所有内容

jobName

要运行的作业的名称。

这些参数必须首先以路径传递,然后以名称传递。这些之后的所有参数都被认为是 JobParameters并且必须采用'name = value'的格式:

<bash$ java CommandLineJobRunner io.spring.EndOfDayJobConfiguration endOfDay schedule.date(date)=2007/05/05

在大多数情况下,您可能想使用清单在jar中声明您的主类,但为简单起见,直接使用了该类。此示例使用domainLanguageOfBatch中的相同“ EndOfDay”示例第一个参数是“ io.spring.EndOfDayJobConfiguration”,它是包含Job的配置类的完全限定类名。第二个参数'endOfDay'表示作业名称。最后一个参数'schedule.date(date)= 2007/05/05'将转换为JobParameters。以下是Java配置的示例:

@Configuration
@EnableBatchProcessing
public class EndOfDayJobConfiguration {

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Bean
    public Job endOfDay() {
        return this.jobBuilderFactory.get("endOfDay")
    				.start(step1())
    				.build();
    }

    @Bean
    public Step step1() {
        return this.stepBuilderFactory.get("step1")
    				.tasklet((contribution, chunkContext) -> null)
    				.build();
    }
}

此示例过于简单,因为通常在Spring Batch中运行批处理作业还有更多要求,但是它可以显示CommandLineJobRunnerJob的两个主要要求。 JobLauncher

退出码

从命令行启动批处理作业时,通常使用企业计划程序。大多数调度程序都非常笨,只能在流程级别上工作。这意味着他们只知道他们正在调用的某些操作系统进程,例如shell脚本。在这种情况下,将作业成功或失败返回给调度程序的唯一方法是通过返回码。返回码是由进程返回到调度程序的数字,指示运行结果。在最简单的情况下:0是成功,1是失败。但是,可能会有更复杂的情况:如果作业A返回4个启动作业B,并且如果作业5返回5个启动作业C。这种类型的行为是在调度程序级别配置的,但重要的是,诸如Spring Batch之类的处理框架必须提供一种方法来返回特定批处理作业的“退出代码”的数字表示形式。在Spring Batch中,它封装在ExitStatus,这将在第5章中更详细地介绍。为了讨论退出代码,要知道的唯一重要的事情是,它 ExitStatus具有退出代码属性,该属性由框架(或开发人员)设置,并作为一部分返回。从 JobExecution退回的款项 JobLauncherCommandLineJobRunner转换此字符串值使用了一些ExitCodeMapper 接口:

public interface ExitCodeMapper {

    public int intValue(String exitCode);

}

an的基本约定 ExitCodeMapper是,给定字符串退出代码,将返回数字表示形式。作业运行程序使用的默认实现是,SimpleJvmExitCodeMapper 它返回0表示完成,返回1表示一般错误,返回2表示任何作业运行程序错误,例如无法Job在提供的上下文中找到 如果需要比上述3个值更复杂的东西,则ExitCodeMapper必须提供接口的自定义实现由于the CommandLineJobRunner是创建的类,ApplicationContext因此无法“连接在一起”,因此必须自动连接任何需要覆盖的值。这意味着,如果 ExitCodeMapperBeanFactory,它将在创建上下文后注入到运行器中。提供您自己的所有操作, ExitCodeMapper就是将实现声明为根级Bean,并确保它是运行ApplicationContext程序加载的Bean的一部分

4.5.2.从Web容器中运行作业

从历史上看,如上所述,已从命令行启动了诸如批处理作业之类的脱机处理。但是,在许多情况下,从中启动HttpRequest是更好的选择。许多此类用例包括报告,临时作业运行和Web应用程序支持。因为按定义,批处理作业可以长期运行,所以最重要的问题是确保异步启动该作业:

Web容器中的异步作业启动器序列
图10. Web容器中的异步作业启动器序列

在这种情况下,控制器是Spring MVC控制器。关于Spring MVC的更多信息可以在这里找到:https://docs.spring.io/spring/docs/current/spring-framework-reference/web.html#mvc控制器Job使用 JobLauncher已配置为异步启动的来启动 ,然后立即返回JobExecutionJob可能仍在运行,但是,这种无阻塞行为允许控制器立即返回,该处理的时候需要HttpRequest下面是一个示例:

@Controller
public class JobLauncherController {

    @Autowired
    JobLauncher jobLauncher;

    @Autowired
    Job job;

    @RequestMapping("/jobLauncher.html")
    public void handle() throws Exception{
        jobLauncher.run(job, new JobParameters());
    }
}

4.6.高级元数据使用

到目前为止,已经讨论JobLauncherJobRepository接口。它们一起代表了作业的简单启动和批处理域对象的基本CRUD操作:

工作库
图11.作业存储库

A JobLauncher使用 JobRepository创建新 JobExecution对象并运行它们。 Job并且Step以后的实现会JobRepository在Job运行期间将相同的内容用于相同执行的基本更新。基本操作足以满足简单的场景,但是在具有成百上千个批处理作业和复杂的调度要求的大型批处理环境中,需要对元数据进行更高级的访问:

作业存储库高级
图12.高级作业存储库访问

JobExplorerJobOperator接口,这将在下面讨论的,用于查询和控制元数据添加附加功能。

4.6.1.查询存储库

在使用任何高级功能之前,最基本的需求是能够查询存储库中现有的执行情况。该功能由JobExplorer接口提供

public interface JobExplorer {

    List<JobInstance> getJobInstances(String jobName, int start, int count);

    JobExecution getJobExecution(Long executionId);

    StepExecution getStepExecution(Long jobExecutionId, Long stepExecutionId);

    JobInstance getJobInstance(Long instanceId);

    List<JobExecution> getJobExecutions(JobInstance jobInstance);

    Set<JobExecution> findRunningJobExecutions(String jobName);
}

从上面的方法签名可以明显看出,它 JobExplorer是的只读版本 JobRepository,并且像一样 JobRepository,可以通过工厂bean轻松配置:

Java配置
...
// This would reside in your BatchConfigurer implementation
@Override
public JobExplorer getJobExplorer() throws Exception {
	JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
	factoryBean.setDataSource(this.dataSource);
	return factoryBean.getObject();
}
...

在本章的前面,提到了JobRepository可以修改表的前缀 以允许使用不同的版本或模式。由于的 JobExplorer使用相同的表,它也需要设置前缀的能力:

Java配置
...
// This would reside in your BatchConfigurer implementation
@Override
public JobExplorer getJobExplorer() throws Exception {
	JobExplorerFactoryBean factoryBean = new JobExplorerFactoryBean();
	factoryBean.setDataSource(this.dataSource);
	factoryBean.setTablePrefix("SYSTEM.");
	return factoryBean.getObject();
}
...

4.6.2.职位注册

一个JobRegistry(和它的父接口JobLocator)是不是强制性的,但如果你想跟踪哪些作业的情况下可用它可能是有用的。当作业已在其他位置(例如,在子上下文中)创建时,对于在应用程序上下文中集中收集作业也很有用。自定义JobRegistry实现还可以用于操纵已注册作业的名称和其他属性。框架仅提供一种实现,该实现基于从作业名称到作业实例的简单映射。

使用时@EnableBatchProcessingJobRegistry开箱即用为您提供a 如果要配置自己的:

...
// This is already provided via the @EnableBatchProcessing but can be customized via
// overriding the getter in the SimpleBatchConfiguration
@Override
@Bean
public JobRegistry jobRegistry() throws Exception {
	return new MapJobRegistry();
}
...

有两种JobRegistry自动填充方法:使用bean后处理器和使用注册商生命周期组件。以下各节将介绍这两种机制。

JobRegistryBeanPostProcessor

这是一个bean后处理器,可以在创建所有作业时注册它们:

Java配置
@Bean
public JobRegistryBeanPostProcessor jobRegistryBeanPostProcessor() {
    JobRegistryBeanPostProcessor postProcessor = new JobRegistryBeanPostProcessor();
    postProcessor.setJobRegistry(jobRegistry());
    return postProcessor;
}

尽管不是严格必需的,但示例中的后处理器已被赋予一个ID,以便可以将其包含在子上下文中(例如,作为父bean定义),并使在那里创建的所有作业也自动注册。

自动工作注册器

这是一个生命周期组件,它创建子上下文并在创建这些上下文时注册这些上下文中的作业。这样做的优点之一是,尽管子上下文中的作业名称在注册表中仍必须是全局唯一的,但它们的依存关系可以具有“自然”名称。因此,例如,您可以创建一组XML配置文件,每个XML配置文件仅具有一个Job,但是都具有具有ItemReader相同bean名称(例如“ reader”)的的不同定义 如果将所有这些文件都导入到同一上下文中,那么阅读器定义将相互冲突并相互替代,但是使用自动注册器可以避免这种情况。这使得集成由应用程序的各个模块贡献的作业变得更加容易。

Java配置
@Bean
public AutomaticJobRegistrar registrar() {

    AutomaticJobRegistrar registrar = new AutomaticJobRegistrar();
    registrar.setJobLoader(jobLoader());
    registrar.setApplicationContextFactories(applicationContextFactories());
    registrar.afterPropertiesSet();
    return registrar;

}

注册服务商具有两个必填属性,一个是的数组 ApplicationContextFactory(此处是从便利的工厂bean创建的),另一个是 JobLoaderJobLoader 负责管理孩子上下文的生命周期和注册的工作JobRegistry

ApplicationContextFactory负责创建子上下文和最常见的用法是如上使用 ClassPathXmlApplicationContextFactory此工厂的功能之一是,默认情况下,它将某些配置从父上下文复制到子级。因此,例如PropertyPlaceholderConfigurer,如果子项或AOP配置应与父项相同,则不必重新定义 它。

所述AutomaticJobRegistrar可以结合使用与JobRegistryBeanPostProcessor 如果需要的话(只要DefaultJobLoader被用作孔)。例如,如果在主父上下文以及子位置中定义了作业,则这可能是理想的。

4.6.3.求职者

如前所述,JobRepository 提供对元数据的CRUD操作,并JobExplorer提供对元数据的 只读操作。但是,与批处理操作员通常一起使用时,这些操作一起用于执行常见的监视任务(如停止,重新启动或汇总作业)时最有用。Spring Batch通过JobOperator接口提供了以下类型的操作

public interface JobOperator {

    List<Long> getExecutions(long instanceId) throws NoSuchJobInstanceException;

    List<Long> getJobInstances(String jobName, int start, int count)
          throws NoSuchJobException;

    Set<Long> getRunningExecutions(String jobName) throws NoSuchJobException;

    String getParameters(long executionId) throws NoSuchJobExecutionException;

    Long start(String jobName, String parameters)
          throws NoSuchJobException, JobInstanceAlreadyExistsException;

    Long restart(long executionId)
          throws JobInstanceAlreadyCompleteException, NoSuchJobExecutionException,
                  NoSuchJobException, JobRestartException;

    Long startNextInstance(String jobName)
          throws NoSuchJobException, JobParametersNotFoundException, JobRestartException,
                 JobExecutionAlreadyRunningException, JobInstanceAlreadyCompleteException;

    boolean stop(long executionId)
          throws NoSuchJobExecutionException, JobExecutionNotRunningException;

    String getSummary(long executionId) throws NoSuchJobExecutionException;

    Map<Long, String> getStepExecutionSummaries(long executionId)
          throws NoSuchJobExecutionException;

    Set<String> getJobNames();

}

上述操作表示从许多不同的接口方法,例如JobLauncherJobRepositoryJobExplorer,和 JobRegistry为此,所提供的实施JobOperatorSimpleJobOperator具有很多依赖:

 /**
  * All injected dependencies for this bean are provided by the @EnableBatchProcessing
  * infrastructure out of the box.
  */
 @Bean
 public SimpleJobOperator jobOperator(JobExplorer jobExplorer,
                                JobRepository jobRepository,
                                JobRegistry jobRegistry) {

	SimpleJobOperator jobOperator = new SimpleJobOperator();

	jobOperator.setJobExplorer(jobExplorer);
	jobOperator.setJobRepository(jobRepository);
	jobOperator.setJobRegistry(jobRegistry);
	jobOperator.setJobLauncher(jobLauncher);

	return jobOperator;
 }

如果您在作业存储库上设置了表前缀,请不要忘记也在作业浏览器上进行设置。

4.6.4.JobParametersIncrementer

上的大多数方法JobOperator都是不言自明的,并且可以在接口javadoc上找到更详细的说明 但是,该 startNextInstance方法值得注意。此方法将始终启动Job的新实例。如果a中存在严重问题,JobExecution并且需要从头开始重新启动作业,这将非常有用 JobLauncher不过,与之不同的是,如果参数与先前的任何参数集不同 ,它都需要一个新JobParameters对象来触发对象 JobInstance,该 startNextInstance方法将使用 JobParametersIncrementer绑定到来 Job将强制Job到新实例:

public interface JobParametersIncrementer {

    JobParameters getNext(JobParameters parameters);

}

的约定JobParametersIncrementer是,给定一个JobParameters 对象,它将通过增加可能包含的任何必要值来返回“下一个” JobParameters对象。该策略很有用,因为该框架无法知道JobParameters对其进行“下一个”实例的更改例如,如果其中唯一的值 JobParameters是日期,并且应该创建下一个实例,那么该值应该增加一天吗?还是一周(例如,如果工作是每周一次)?对于有助于识别Job的任何数值,可以说相同,如下所示:

public class SampleIncrementer implements JobParametersIncrementer {

    public JobParameters getNext(JobParameters parameters) {
        if (parameters==null || parameters.isEmpty()) {
            return new JobParametersBuilder().addLong("run.id", 1L).toJobParameters();
        }
        long id = parameters.getLong("run.id",1L) + 1;
        return new JobParametersBuilder().addLong("run.id", id).toJobParameters();
    }
}

在此示例中,键为“ run.id”的值用于区分JobInstances如果 JobParameters传入的为null,则可以假定Job之前从未运行过,因此可以返回其初始状态。但是,如果不是,则将获得旧值,将其增加一并返回。

增量器可以通过incrementer构建器中提供方法与“作业”相关联

@Bean
public Job footballJob() {
    return this.jobBuilderFactory.get("footballJob")
    				 .incrementer(sampleIncrementer())
    				 ...
                     .build();
}

4.6.5.停止工作

的最常见用例之一 JobOperator是正常停止Job:

Set<Long> executions = jobOperator.getRunningExecutions("sampleJob");
jobOperator.stop(executions.iterator().next());

关闭不是立即关闭的,因为没有办法强制立即关闭,特别是如果当前执行是在框架无法控制的开发人员代码中,例如业务服务。但是,一旦控制权返回到框架,它将把当前状态设置 StepExecutionBatchStatus.STOPPED,保存它,然后对JobExecution结束之前进行相同的操作

4.6.6.中止工作

FAILED可以重新启动的作业执行(如果可以重新Job启动)。状态为的作业执行 ABANDONED不会被框架重新启动。ABANDONED状态还用于步骤执行中,以在重新启动的作业执行中将其标记为可跳过:如果作业正在执行并且遇到ABANDONED在先前的失败作业执行中已标记的步骤, 它将继续进行下一步(确定作业流程定义和步骤执行退出状态)。

如果该进程终止("kill -9"或服务器故障),则该作业当然不会运行,但是JobRepository无法得知,因为在进程终止之前没有人告诉过它。您必须手动告诉它,您知道执行失败或应被视为中止(将其状态更改为 FAILEDABANDONED)-这是一项业务决策,无法自动执行。FAILED当不可重启或知道重启数据有效时,才将状态更改为。Spring Batch Admin中有一个实用程序JobService来中止作业执行。

5.配置 Step

正如在领域一章中讨论的那样,a Step是一个域对象,它封装了批处理作业的一个独立的顺序阶段,并且包含定义和控制实际批处理所需的所有信息。这是一个模糊的描述,因为任何给定的内容 Step都是由开发人员决定编写的JobA Step可以根据开发人员的需求简单或复杂。一个简单的方法Step可能会将文件中的数据加载到数据库中,几乎不需要代码(取决于所使用的实现)。较复杂的Step业务规则可能会在处理过程中应用到复杂的业务规则,如下图所示:

步
图13.步骤

5.1.块处理

Spring Batch在最常见的实现中使用“面向块的”处理风格。面向块的处理是指一次读取一个数据并创建在事务边界内写出的“块”。从读入一项ItemReader,交给一项ItemProcessor,然后汇总。一旦读取的项目数等于提交间隔,整个块就由写入 ItemWriter,然后提交事务。下图显示了该过程:

块处理
图14.块处理

以下代码显示了相同的概念:

List items = new Arraylist();
for(int i = 0; i < commitInterval; i++){
    Object item = itemReader.read()
    Object processedItem = itemProcessor.process(item);
    items.add(processedItem);
}
itemWriter.write(items);

5.1.1.配置一个Step

尽管a所需依赖项的列表相对较短Step,但它是一个极其复杂的类,可能包含许多协作者。

使用Java配置时,可以使用Spring Batch构建器,如以下示例所示:

Java配置
/**
 * Note the JobRepository is typically autowired in and not needed to be explicitly
 * configured
 */
@Bean
public Job sampleJob(JobRepository jobRepository, Step sampleStep) {
    return this.jobBuilderFactory.get("sampleJob")
    			.repository(jobRepository)
                .start(sampleStep)
                .build();
}

/**
 * Note the TransactionManager is typically autowired in and not needed to be explicitly
 * configured
 */
@Bean
public Step sampleStep(PlatformTransactionManager transactionManager) {
	return this.stepBuilderFactory.get("sampleStep")
				.transactionManager(transactionManager)
				.<String, String>chunk(10)
				.reader(itemReader())
				.writer(itemWriter())
				.build();
}

上面的配置包括创建面向项目的步骤所需的唯一依赖项:

  • readerItemReader提供要处理的项目。

  • writerItemWriter处理所提供项目的ItemReader

  • transactionManager:Spring的PlatformTransactionManager开始,并在处理过程中提交事务。

  • repository在处理期间(即将提交之前)JobRepository定期存储StepExecution和的 ExecutionContext

  • chunk:表示这是基于项目的步骤,是提交事务之前要处理的项目数。

应当注意的是,repository默认jobRepositorytransactionManager 默认transactionManger(通过从基础设施都提供了 @EnableBatchProcessing)。另外,ItemProcessor由于该项目可以直接从阅读器传递给编写器,因此可选。

5.1.3.提交间隔

如前所述,一个步骤读取和写入项目,并使用提供的定期提交PlatformTransactionManager随着commit-interval1,将其写入各个项目后提交。这在许多情况下都不理想,因为开始和提交事务非常昂贵。理想情况下,最好在每个事务中处理尽可能多的项目,这完全取决于要处理的数据类型和与之交互的资源。因此,可以配置在提交中处理的项目数。下面的示例示出了steptasklet具有commit-interval 值10.

Java配置
@Bean
public Job sampleJob() {
    return this.jobBuilderFactory.get("sampleJob")
                     .start(step1())
                     .end()
                     .build();
}

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(10)
				.reader(itemReader())
				.writer(itemWriter())
				.build();
}

在前面的示例中,每个事务处理10个项目。在处理开始时,开始交易。同样,每次read在上调用时 ItemReader,计数器都会递增。达到10时,汇总项目列表将传递到ItemWriter,事务将被提交。

5.1.4.配置Step重启

在“ 配置和运行作业 ”部分中,Job讨论了重新启动 重新启动对步骤有很多影响,因此可能需要一些特定的配置。

设置开始限制

在许多情况下,您可能希望控制a的Step启动次数。例如,Step可能需要配置某个特定对象,使其仅运行一次,因为它会使某些必须手动修复的资源失效,然后才能再次运行。这可以在步骤级别上配置,因为不同的步骤可能有不同的要求。Step可仅被执行一次可以作为相同的一部分存在Job作为Step可无限运行。以下代码片段显示了启动限制配置的示例:

Java配置
@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(10)
				.reader(itemReader())
				.writer(itemWriter())
				.startLimit(1)
				.build();
}

上面的步骤只能运行一次。尝试再次运行它 StartLimitExceededException会引发。请注意,开始限制的默认值为Integer.MAX_VALUE

重新启动已完成 Step

对于可重新启动的作业,可能有一个或多个步骤应始终运行,无论它们是否第一次成功。例如验证步骤或Step在处理之前清理资源的步骤在正常处理重新启动的作业期间,将跳过状态为“已完成”(表示已成功完成)的任何步骤。设置allow-start-if-complete为“ true”将覆盖此设置,以便该步骤始终运行,如以下示例所示:

Java配置
@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(10)
				.reader(itemReader())
				.writer(itemWriter())
				.allowStartIfComplete(true)
				.build();
}
Step 重新启动配置示例

以下示例显示如何配置作业以使其具有可以重新启动的步骤:

Java配置
@Bean
public Job footballJob() {
	return this.jobBuilderFactory.get("footballJob")
				.start(playerLoad())
				.next(gameLoad())
				.next(playerSummarization())
				.end()
				.build();
}

@Bean
public Step playerLoad() {
	return this.stepBuilderFactory.get("playerLoad")
			.<String, String>chunk(10)
			.reader(playerFileItemReader())
			.writer(playerWriter())
			.build();
}

@Bean
public Step gameLoad() {
	return this.stepBuilderFactory.get("gameLoad")
			.allowStartIfComplete(true)
			.<String, String>chunk(10)
			.reader(gameFileItemReader())
			.writer(gameWriter())
			.build();
}

@Bean
public Step playerSummarization() {
	return this.stepBuilderFactor.get("playerSummarization")
			.startLimit(2)
			.<String, String>chunk(10)
			.reader(playerSummarizationSource())
			.writer(summaryWriter())
			.build();
}

前面的示例配置用于一项作业,该作业加载有关足球比赛的信息并进行总结。它包含三个步骤:playerLoadgameLoad,和 playerSummarizationplayerLoad步骤从平面文件加载玩家信息,而gameLoad游戏则执行相同的操作。然后,最后一步会 playerSummarization根据提供的游戏汇总每个玩家的统计信息。假定由playerLoad加载的文件只能加载一次,但是gameLoad可以加载在特定目录中找到的任何游戏,并在将它们成功加载到数据库后将其删除。因此,该playerLoad步骤不包含其他配置。它可以启动任意次,如果完成,将被跳过。gameLoad但是,如果自上次运行以来添加了额外的文件,则每次都需要运行该步骤。为了始终启动,将“ allow-start-if-complete”设置为“ true”。(假定已将数据库表游戏加载到其上,并且具有过程指示器,以确保可以通过摘要步骤正确找到新游戏)。汇总步骤是作业中最重要的步骤,配置为起始限制为2.这很有用,因为如果该步骤连续失败,则会将新的退出代码返回给控制作业执行的操作员,并且可以在进行手动干预之前,请勿重新开始。

这项工作提供了本文档的示例footballJob ,与示例项目中的示例不同。

本节的其余部分描述了footballJob示例的三个运行中的每个运行情况

运行1:

  1. playerLoad 运行并成功完成,将400位玩家添加到“玩家”表中。

  2. gameLoad 运行并处理11个有价值的游戏数据文件,并将其内容加载到“ GAMES”表中。

  3. playerSummarization 开始处理,并在5分钟后失败。

运行2:

  1. playerLoad无法运行,因为它已经成功完成,并且 allow-start-if-complete为“ false”(默认)。

  2. gameLoad 再次运行并处理另外两个文件,并将它们的内容也加载到“ GAMES”表中(带有过程指示器,指示它们尚未处理)。

  3. playerSummarization 开始处理所有剩余的游戏数据(使用进程指示器进行过滤),并在30分钟后再次失败。

运行3:

  1. playerLoad无法运行,因为它已经成功完成,并且 allow-start-if-complete为“ false”(默认)。

  2. gameLoad 再次运行并处理另外两个文件,并将它们的内容也加载到“ GAMES”表中(带有过程指示器,指示它们尚未处理)。

  3. playerSummarization不会启动,并且作业立即被终止,因为这是的第三次执行playerSummarization,并且其限制仅为2.或者必须提高限制,或者Job必须作为new执行JobInstance

5.1.5.配置跳过逻辑

在许多情况下,处理过程中遇到的错误不应导致 Step失败,而应跳过。通常这是必须由了解数据本身及其含义的人做出的决定。例如,财务数据可能无法跳过,因为它会导致资金被转移,这需要完全准确。另一方面,加载供应商列表可能会导致跳过。如果由于格式错误或缺少必要的信息而未加载供应商,则可能没有问题。通常,这些不良记录也会被记录下来,稍后在讨论侦听器时将予以介绍。

以下示例显示了使用跳过限制的示例:

Java配置
@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(10)
				.reader(flatFileItemReader())
				.writer(itemWriter())
				.faultTolerant()
				.skipLimit(10)
				.skip(FlatFileParseException.class)
				.build();
}

在前面的示例中,FlatFileItemReader使用a。如果在任何时候 FlatFileParseException抛出a,则跳过该项目并将其计入总跳过限制10.在块处理的任何阶段(读取,处理,写入)都可能引发声明的异常(及其子类)。但是单独的计数是在步骤执行内对读取,处理和写入的跳过进行的,但是限制适用于所有跳过。一旦达到跳过限制,找到的下一个异常将导致该步骤失败。换句话说,第十一跳会触发异常,而不是第十。

前面的示例的一个问题是,除a之外的任何其他异常 FlatFileParseException都会导致Job失败。在某些情况下,这可能是正确的行为。但是,在其他情况下,可能更容易确定哪些异常应导致失败并跳过其他所有内容,如以下示例所示:

Java配置
@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(10)
				.reader(flatFileItemReader())
				.writer(itemWriter())
				.faultTolerant()
				.skipLimit(10)
				.skip(Exception.class)
				.noSkip(FileNotFoundException.class)
				.build();
}

通过将其标识java.lang.Exception为可跳过的异常类,该配置指示所有Exceptions跳过的异常但是,通过“排除” java.io.FileNotFoundException,配置将可跳过的异常类的列表优化为全部Exceptions 除外 FileNotFoundException如果遇到任何排除的异常类都是致命的(也就是说,它们不会被跳过)。

对于遇到的任何异常,可跳过性由类层次结构中最接近的超类确定。任何未分类的异常均被视为“致命”异常。

skipnoSkip调用的顺序无关紧要。

5.1.6.配置重试逻辑

在大多数情况下,您希望异常导致跳过或Step失败。但是,并非所有例外都是确定性的。如果FlatFileParseException在读取时遇到a ,则始终将其抛出该记录。重置ItemReader并没有帮助。但是,对于其他异常(例如)DeadlockLoserDataAccessException,表示当前进程已尝试更新另一个进程已锁定的记录,请等待并重试可能会导致成功。在这种情况下,应按以下方式配置重试:

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(itemReader())
				.writer(itemWriter())
				.faultTolerant()
				.retryLimit(3)
				.retry(DeadlockLoserDataAccessException.class)
				.build();
}

Step允许的时间中的个别项目可以重试次数,并且是“重试”的例外列表的限制。有关重试如何工作的更多详细信息,请参见 retry

5.1.7.控制回滚

缺省情况下,无论重试还是跳过,都将抛出任何异常,这些异常ItemWriter 是由Stepto 控制的事务回滚的原因。如果如前所述配置了skip,则从抛出的异常ItemReader不会导致回滚。但是,在许多情况下,从中引发的异常ItemWriter不应导致回滚,因为没有采取任何行动来使事务无效。出于这个原因,Step可以使用不引起回滚的异常列表来配置,如以下示例所示:

Java配置
@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(itemReader())
				.writer(itemWriter())
				.faultTolerant()
				.noRollback(ValidationException.class)
				.build();
}
交易读者

的基本契约ItemReader是仅向前。该步骤缓冲读取器的输入,因此在回滚的情况下,不需要从读取器重新读取项目。但是,在某些情况下,阅读器是建立在诸如JMS队列之类的事务资源之上的。在这种情况下,由于队列与回滚的事务相关联,因此将从队列中拉出的消息放回原处。因此,可以将步骤配置为不缓冲项目,如以下示例所示:

Java配置
@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(itemReader())
				.writer(itemWriter())
				.readerIsTransactionalQueue()
				.build();
}

5.1.8.交易属性

事务属性可以用来控制isolationpropagationtimeout设置。有关设置事务属性的更多信息,请参见 Spring核心文档下面的示例设置isolationpropagation以及 timeout交易属性:

Java配置
@Bean
public Step step1() {
	DefaultTransactionAttribute attribute = new DefaultTransactionAttribute();
	attribute.setPropagationBehavior(Propagation.REQUIRED.value());
	attribute.setIsolationLevel(Isolation.DEFAULT.value());
	attribute.setTimeout(30);

	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(itemReader())
				.writer(itemWriter())
				.transactionAttribute(attribute)
				.build();
}

5.1.9.注册ItemStream一个Step

该步骤ItemStream必须在其生命周期中的必要时间进行回调(有关ItemStream接口的更多信息,请参见 ItemStream)。如果步骤失败并且可能需要重新启动,这是至关重要的,因为ItemStream接口是步骤获取有关执行之间持久状态的所需信息接口。

如果ItemReaderItemProcessorItemWriter本身实现了ItemStream 接口,那么这些被自动注册。任何其他流都需要单独注册。在将间接依赖项(例如委托)注入到读取器和写入器中时,通常是这种情况。可以Step通过'streams'元素将流注册 ,如以下示例所示:

Java配置
@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(itemReader())
				.writer(compositeItemWriter())
				.stream(fileItemWriter1())
				.stream(fileItemWriter2())
				.build();
}

/**
 * In Spring Batch 4, the CompositeItemWriter implements ItemStream so this isn't
 * necessary, but used for an example.
 */
@Bean
public CompositeItemWriter compositeItemWriter() {
	List<ItemWriter> writers = new ArrayList<>(2);
	writers.add(fileItemWriter1());
	writers.add(fileItemWriter2());

	CompositeItemWriter itemWriter = new CompositeItemWriter();

	itemWriter.setDelegates(writers);

	return itemWriter;
}

在上面的示例中,CompositeItemWriter不是ItemStream,但两个委托都是。因此,必须将两个委托编写者都明确注册为流,以便框架正确处理它们。ItemReader不需要也被明确登记为甲流,因为它的一个直接财产Step该步骤现在可以重新启动,并且在发生故障的情况下,读取器和写入器的状态可以正确保留。

5.1.10.拦截Step执行

与一样Job,在执行期间Step可能会有许多事件,用户可能需要执行某些功能。例如,为了写出需要页脚的平面文件,需要在完成后ItemWriter通知需求Step,以便可以编写页脚。这可以通过许多Step作用域侦听器之一来完成

任何实现扩展之一的类StepListener(但由于接口本身是空的,所以不能实现该接口本身)可以应用于listeners元素的单步操作listeners元素在步骤,tasklet或块声明中有效。建议您在应用其功能的级别声明侦听器,或者,如果它具有多种功能(例如StepExecutionListenerItemReadListener),则在其应用的最细粒度级别声明它。以下示例显示了在块级别应用的侦听器:

Java配置
@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(10)
				.reader(reader())
				.writer(writer())
				.listener(chunkListener())
				.build();
}

ItemReaderItemWriter或者ItemProcessor是本身实现的一个 StepListener接口,自动注册的Step,如果使用命名空间<step>元素或在一个*StepFactoryBean工厂。这仅适用于直接注入的组件Step如果侦听器嵌套在另一个组件中,则需要对其进行显式注册(如先前在使用进行 注册ItemStream中所述Step)。

除了StepListener界面之外,还提供注释来解决相同的问题。普通的旧Java对象可以使用带有这些批注的方法,然后将其转换为相应的StepListener类型。它也是常见的块部件,诸如注释的自定义实现ItemReaderItemWriterTaskletXML解析器会针对<listener/>元素对注释进行分析,listener在构建器中使用方法进行注册,因此您所需要做的就是使用XML名称空间或构建器通过步骤注册侦听器。

StepExecutionListener

StepExecutionListener代表最通用的Step执行侦听器它允许在a Step开始之前和结束之后进行通知,无论它是正常结束还是失败,如以下示例所示:

public interface StepExecutionListener extends StepListener {

    void beforeStep(StepExecution stepExecution);

    ExitStatus afterStep(StepExecution stepExecution);

}

ExitStatus是的返回类型,afterStep以使侦听器有机会修改在完成时返回的退出代码Step

与此接口对应的注释为:

  • @BeforeStep

  • @AfterStep

ChunkListener

块定义为在事务范围内处理的项目。在每个提交间隔提交事务都会提交一个“块”。ChunkListener可以在块开始处理之前或块成功完成之后使用A 执行逻辑,如以下接口定义所示:

public interface ChunkListener extends StepListener {

    void beforeChunk(ChunkContext context);
    void afterChunk(ChunkContext context);
    void afterChunkError(ChunkContext context);

}

在事务开始之后但在上调用read之前,调用beforeChunk方法ItemReader相反,afterChunk在提交块后调用(如果回滚则根本不调用)。

与此接口对应的注释为:

  • @BeforeChunk

  • @AfterChunk

  • @AfterChunkError

ChunkListener没有块声明时可以应用A。TaskletStep负责调用的ChunkListener,所以它适用于非面向项目-微进程以及(它是在微进程之前和之后调用)。

ItemReadListener

先前在讨论跳过逻辑时,曾提到记录跳过的记录可能会有所帮助,以便稍后进行处理。如果发生读取错误,可以使用来完成ItemReaderListener,如以下接口定义所示:

public interface ItemReadListener<T> extends StepListener {

    void beforeRead();
    void afterRead(T item);
    void onReadError(Exception ex);

}

beforeRead方法是在每次调用前叫上阅读ItemReaderafterRead在每次成功调用read之后,将调用方法,并将已读取的项目传递给方法。如果读取时出错,onReadError则调用方法。提供了遇到的异常,以便可以将其记录下来。

与此接口对应的注释为:

  • @BeforeRead

  • @AfterRead

  • @OnReadError

ItemProcessListener

与一样ItemReadListener,可以“监听”项目的处理,如以下接口定义所示:

public interface ItemProcessListener<T, S> extends StepListener {

    void beforeProcess(T item);
    void afterProcess(T item, S result);
    void onProcessError(T item, Exception e);

}

beforeProcess方法在process先调用,然后ItemProcessor将要处理的项目交给该方法。afterProcess成功处理项目后,将调用方法。如果处理时出错,onProcessError则调用方法。提供遇到的异常和尝试处理的项目,以便可以记录它们。

与此接口对应的注释为:

  • @BeforeProcess

  • @AfterProcess

  • @OnProcessError

ItemWriteListener

可以使用来“侦听”项目的编写ItemWriteListener,如以下接口定义所示:

public interface ItemWriteListener<S> extends StepListener {

    void beforeWrite(List<? extends S> items);
    void afterWrite(List<? extends S> items);
    void onWriteError(Exception exception, List<? extends S> items);

}

beforeWrite方法在write先被调用ItemWriter并且将被写入的项目列表移交给方法afterWrite成功写入项目后,将调用方法。如果写入时发生错误,onWriteError则调用方法。提供遇到的异常以及尝试写入的项目,以便可以记录它们。

与此接口对应的注释为:

  • @BeforeWrite

  • @AfterWrite

  • @OnWriteError

SkipListener

ItemReadListenerItemProcessListenerItemWriteListener都提供了用于通知错误的机制,但没有一个机制可通知您实际上已跳过了一条记录。onWriteError例如,即使重试并成功执行项目,也会被调用。因此,有一个单独的界面可以跟踪跳过的项目,如以下界面定义所示:

public interface SkipListener<T,S> extends StepListener {

    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);

}

onSkipInRead在阅读时每跳过一个项目就会调用一次。应当注意,回滚可能导致同一项被注册为多次跳过。 onSkipInWrite写入时跳过项目时调用。由于已成功读取(而不是跳过)该项目,因此还提供了该项目本身作为参数。

与此接口对应的注释为:

  • @OnSkipInRead

  • @OnSkipInWrite

  • @OnSkipInProcess

跳过侦听器和事务

a的最常见用例之一SkipListener是注销跳过的项目,以便可以使用另一个批处理甚至人工流程来评估和解决导致跳过的问题。由于在很多情况下原始交易可能会被回滚,因此Spring Batch提供了两个保证:

  1. 每个项目仅调用一次适当的跳过方法(取决于错误发生的时间)。

  2. SkipListener始终调用事务提交之前。这是为了确保侦听器调用的任何事务性资源都不会由于失败而回滚ItemWriter

5.2. TaskletStep

面向块的处理不是在中处理的唯一方法 Step如果Step必须包含一个简单的存储过程调用怎么办?您可以将调用实现为,ItemReader并在过程完成后返回null。但是,这样做有点不自然,因为需要没有操作ItemWriterSpring Batch TaskletStep为此提供了方案。

Tasklet是一个具有一个方法的简单接口,该方法execute被反复调用,TaskletStep直到它返回RepeatStatus.FINISHED或引发异常以指示失败。每个对a的调用Tasklet都包装在一个事务中。 Tasklet实现者可以调用存储过程,脚本或简单的SQL更新语句。

要创建一个TaskletStep,传递给tasklet构建器方法的bean 应该实现该Tasklet接口。chunk建立时不应 呼叫to TaskletStep以下示例显示了一个简单的任务集:

@Bean
public Step step1() {
    return this.stepBuilderFactory.get("step1")
    			.tasklet(myTasklet())
    			.build();
}

TaskletStep自动将Tasklet注册为StepListener实现StepListener 接口的对象。

5.2.1. TaskletAdapter

ItemReaderItemWriter接口的其他适配器一样,该Tasklet 接口包含一个实现,使自己可以适应任何现有的类:TaskletAdapter一个可能有用的示例是现有的DAO,用于更新一组记录上的标志。TaskletAdapter可用于调用这个类而不必编写一个适配器的Tasklet接口,如图以下示例:

Java配置
@Bean
public MethodInvokingTaskletAdapter myTasklet() {
	MethodInvokingTaskletAdapter adapter = new MethodInvokingTaskletAdapter();

	adapter.setTargetObject(fooDao());
	adapter.setTargetMethod("updateFoo");

	return adapter;
}

5.2.2.示例Tasklet实施

许多批处理作业包含必须在主处理开始之前执行的步骤,以设置各种资源,或者在处理完成后清理这些资源。对于需要大量处理文件的工作,通常需要在成功将文件上载到其他位置后在本地删除某些文件。以下示例(摘自 Spring Batch示例项目)是Tasklet具有这种责任的实现:

public class FileDeletingTasklet implements Tasklet, InitializingBean {

    private Resource directory;

    public RepeatStatus execute(StepContribution contribution,
                                ChunkContext chunkContext) throws Exception {
        File dir = directory.getFile();
        Assert.state(dir.isDirectory());

        File[] files = dir.listFiles();
        for (int i = 0; i < files.length; i++) {
            boolean deleted = files[i].delete();
            if (!deleted) {
                throw new UnexpectedJobExecutionException("Could not delete file " +
                                                          files[i].getPath());
            }
        }
        return RepeatStatus.FINISHED;
    }

    public void setDirectoryResource(Resource directory) {
        this.directory = directory;
    }

    public void afterPropertiesSet() throws Exception {
        Assert.notNull(directory, "directory must be set");
    }
}

前面的Tasklet实现删除给定目录内的所有文件。应当注意,该execute方法仅被调用一次。所有剩下的就是引用TaskletStep

Java配置
@Bean
public Job taskletJob() {
	return this.jobBuilderFactory.get("taskletJob")
				.start(deleteFilesInDir())
				.build();
}

@Bean
public Step deleteFilesInDir() {
	return this.stepBuilderFactory.get("deleteFilesInDir")
				.tasklet(fileDeletingTasklet())
				.build();
}

@Bean
public FileDeletingTasklet fileDeletingTasklet() {
	FileDeletingTasklet tasklet = new FileDeletingTasklet();

	tasklet.setDirectoryResource(new FileSystemResource("target/test-outputs/test-dir"));

	return tasklet;
}

5.3.控制步骤流

由于能够将一个拥有的作业中的各个步骤分组在一起,因此需要能够控制该作业如何从一个步骤“流”到另一个步骤。a的失败Step并不一定意味着Job应当失败。此外,可能有不止一种类型的“成功”确定Step下一步应执行什么。根据一组的Steps配置方式,某些步骤甚至可能根本不执行。

5.3.1.顺序流

最简单的流程场景是一项工作,其中所有步骤都按顺序执行,如下图所示:

顺序流
图15.顺序流

可以通过使用step元素的'next'属性来实现,如以下示例所示:

Java配置
@Bean
public Job job() {
	return this.jobBuilderFactory.get("job")
				.start(stepA())
				.next(stepB())
				.next(stepC())
				.build();
}

在上述情况下,“步骤A”首先运行,因为它是第一个Step列出的。如果“步骤A”正常完成,则“步骤B”运行,依此类推。但是,如果“步骤A”失败,则整个Job失败,并且“步骤B”不执行。

5.3.2.条件流

在上面的示例中,只有两种可能性:

  1. Step成功,接下来Step应该执行。

  2. Step失败,因此,在Job出现故障。

在许多情况下,这可能就足够了。但是,如果a的失败Step应该触发不同的Step而不是导致失败的情况呢?下图显示了这样的流程:

条件流
图16.条件流

为了处理更复杂的场景,Spring Batch名称空间允许在step元素内定义转换元素。next 元素就是这样一种过渡next属性一样next元素告诉下一步执行Job哪个Step但是,与该属性不同,next给定允许任何数量的元素Step,并且在失败的情况下没有默认行为。这意味着,如果使用过渡元素,则Step必须显式定义过渡的所有行为还要注意,单个步骤不能同时具有next属性和transition元素。

next元素指定模式匹配和步骤接下来要执行,如图以下示例:

Java配置
@Bean
public Job job() {
	return this.jobBuilderFactory.get("job")
				.start(stepA())
				.on("*").to(stepB())
				.from(stepA()).on("FAILED").to(stepC())
				.end()
				.build();
}

使用java配置时,该on方法使用简单的模式匹配方案来匹配ExitStatus执行时产生的结果Step

模式中仅允许使用两个特殊字符:

  • “ *”匹配零个或多个字符

  • “?” 完全匹配一个字符

例如,“ c * t”匹配“ cat”和“ count”,而“ c?t”匹配“ cat”但不匹配“ count”。

虽然上的过渡元素数量没有限制,但Step如果Step 执行导致ExitStatus元素未涵盖的过渡元素,则框架将引发异常,并且Job失败。框架自动排序从最具体到最不具体的过渡。这意味着,即使在上面的示例中将订单交换为“ stepA” ExitStatus,“ FAILED”仍将转到“ stepC”。

批处理状态与退出状态

当配置Job为条件流,理解之间的区别是非常重要的BatchStatusExitStatusBatchStatus是枚举是两者的性质JobExecutionStepExecution,并用于由框架以记录的状态JobStep它可以是以下值之一: COMPLETEDSTARTINGSTARTEDSTOPPINGSTOPPEDFAILEDABANDONED,或 UNKNOWN其中大多数是不言自明的:COMPLETED是在步骤或作业成功完成时设置的状态,还是在步骤或作业成功完成FAILED时设置的状态,依此类推。

以下示例在使用Java配置时包含'on'元素:

...
.from(stepA()).on("FAILED").to(stepB())
...

乍一看,它会出现“上”引用BatchStatusStep其所属。但是,它实际上引用ExitStatusStep顾名思义,ExitStatus代表a Step完成执行后的状态

用英语说:“如果退出代码为,则转到步骤B FAILED”。默认情况下,退出代码始终是相同BatchStatusStep,这就是为什么上述工作的条目。但是,如果退出代码需要不同怎么办?一个很好的例子来自示例项目中的跳过示例作业:

Java配置
@Bean
public Job job() {
	return this.jobBuilderFactory.get("job")
			.start(step1()).on("FAILED").end()
			.from(step1()).on("COMPLETED WITH SKIPS").to(errorPrint1())
			.from(step1()).on("*").to(step2())
			.end()
			.build();
}

step1 有三种可能性:

  1. Step失败,在这种情况下,工作应该失败。

  2. Step成功完成。

  3. Step成功但退出代码完成“完成,跳过”。在这种情况下,应运行不同的步骤来处理错误。

以上配置有效。但是,需要根据跳过了记录的执行条件来更改退出代码,如以下示例所示:

public class SkipCheckingListener extends StepExecutionListenerSupport {
    public ExitStatus afterStep(StepExecution stepExecution) {
        String exitCode = stepExecution.getExitStatus().getExitCode();
        if (!exitCode.equals(ExitStatus.FAILED.getExitCode()) &&
              stepExecution.getSkipCount() > 0) {
            return new ExitStatus("COMPLETED WITH SKIPS");
        }
        else {
            return null;
        }
    }
}

上面的代码是a StepExecutionListener,首先检查以确保Step成功,然后检查上的跳过计数StepExecution是否大于0.如果同时满足两个条件ExitStatusCOMPLETED WITH SKIPS则返回退出代码为的新代码

5.3.3.配置停止

的讨论后BatchStatus和退出状态,人们可能会问如何BatchStatus以及ExitStatus为确定Job虽然这些状态是Step由执行的代码确定的,但是的状态Job是根据配置确定的。

到目前为止,讨论的所有工作配置都至少具有一个Step没有过渡的决赛例如,执行以下步骤后,Job结束,如以下示例所示:

@Bean
public Job job() {
	return this.jobBuilderFactory.get("job")
				.start(step1())
				.build();
}

如果没有为定义转换Step,则的状态Job定义如下:

  • 如果Step有端ExitStatus失败,则该BatchStatusExitStatusJob都是FAILED

  • 否则,BatchStatusExitStatus中的Job和均为COMPLETED

尽管这种终止批处理作业的方法对于某些批处理作业(例如简单的顺序步骤作业)已足够,但是可能需要自定义定义的作业停止方案。为此,Spring Batch提供了三个过渡元素来停止a Job(除了我们之前讨论next元素之外)。这些停止元素中的每一个都停止Job特定的BatchStatus值得注意的是,停止过渡元素对任一没有影响是很重要的 BatchStatusExitStatus任何StepsJob这些元素仅影响的最终状态Job例如,作业中的每个步骤都可能具有的状态,FAILED而作业的状态可能是COMPLETED

一步一步结束

配置差端指示一个Job停止了BatchStatusCOMPLETED一个 Job已经用状态完成后COMPLETED无法重新启动(框架抛出一个JobInstanceAlreadyCompleteException)。

使用Java配置时,“ end”方法用于此任务。end方法还允许使用可选的'exitStatus'参数,该参数可用于自定义 ExitStatusJob如果没有提供“退出状态”值,则ExitStatusCOMPLETED在默认情况下,以匹配BatchStatus

在下列情况下,如果step2失败,则Job有停止BatchStatusCOMPLETEDExitStatusCOMPLETEDstep3不运行。否则,执行移至step3请注意,如果step2失败,Job则无法重新启动(因为状态为COMPLETED)。

@Bean
public Job job() {
	return this.jobBuilderFactory.get("job")
				.start(step1())
				.next(step2())
				.on("FAILED").end()
				.from(step2()).on("*").to(step3())
				.end()
				.build();
}
失步

配置步骤在给定的点失败指示一个Job停止用 BatchStatusFAILED与end不同,a的故障Job不会阻止a Job 重新启动。

在下列情况下,如果step2失败,则Job有停止BatchStatusFAILEDExitStatusEARLY TERMINATIONstep3不执行。否则,执行移至step3此外,如果step2失败并Job重新启动,则从再次开始执行step2

Java配置
@Bean
public Job job() {
	return this.jobBuilderFactory.get("job")
			.start(step1())
			.next(step2()).on("FAILED").fail()
			.from(step2()).on("*").to(step3())
			.end()
			.build();
}
在给定步骤停止作业

在某些特定步骤配置工作,停止指示一个Job停止用 BatchStatusSTOPPED停止Job可以暂时中断处理,以便操作员可以在重新启动之前采取一些措施Job

使用Java配置时,该stopAndRestart方法需要一个'restart'属性,该属性指定“作业重新启动”时执行执行的步骤。

在以下情况下,如果以step1结尾COMPLETE,则作业将停止。重新启动后,执行从开始step2

@Bean
public Job job() {
	return this.jobBuilderFactory.get("job")
			.start(step1()).on("COMPLETED").stopAndRestart(step2())
			.end()
			.build();
}

5.3.4.程序流程决策

在某些情况下,ExitStatus可能需要比决定下一步执行更多的信息在这种情况下,JobExecutionDecider可以使用a来辅助决策,如以下示例所示:

public class MyDecider implements JobExecutionDecider {
    public FlowExecutionStatus decide(JobExecution jobExecution, StepExecution stepExecution) {
        String status;
        if (someCondition()) {
            status = "FAILED";
        }
        else {
            status = "COMPLETED";
        }
        return new FlowExecutionStatus(status);
    }
}

在以下示例中,使用Java配置时,将实现的bean JobExecutionDecider直接传递给next调用。

Java配置
@Bean
public Job job() {
	return this.jobBuilderFactory.get("job")
			.start(step1())
			.next(decider()).on("FAILED").to(step2())
			.from(decider()).on("COMPLETED").to(step3())
			.end()
			.build();
}

5.3.5.分流

到目前为止,所描述的每种情况都涉及一个Job,它以线性方式一次执行一个步骤。除了这种典型的样式外,Spring Batch还允许使用并行流来配置作业。

基于Java的配置使您可以通过提供的构建器配置拆分。如下例所示,“ split”元素包含一个或多个“ flow”元素,可以在其中定义整个单独的流。“拆分”元素还可以包含任何先前讨论的过渡元素,例如“下一个”属性或“下一个”,“结束”或“失败”元素。

@Bean
public Job job() {
	Flow flow1 = new FlowBuilder<SimpleFlow>("flow1")
			.start(step1())
			.next(step2())
			.build();
	Flow flow2 = new FlowBuilder<SimpleFlow>("flow2")
			.start(step3())
			.build();

	return this.jobBuilderFactory.get("job")
				.start(flow1)
				.split(new SimpleAsyncTaskExecutor())
				.add(flow2)
				.next(step4())
				.end()
				.build();
}

5.3.6.外化流程定义和作业之间的依赖关系

可以将作业中的部分流程外部化为单独的Bean定义,然后重新使用。有两种方法可以这样做。首先是简单地将流声明为对其他地方定义的流的引用,如以下示例所示:

Java配置
@Bean
public Job job() {
	return this.jobBuilderFactory.get("job")
				.start(flow1())
				.next(step3())
				.end()
				.build();
}

@Bean
public Flow flow1() {
	return new FlowBuilder<SimpleFlow>("flow1")
			.start(step1())
			.next(step2())
			.build();
}

如前面的示例所示,定义外部流程的效果是将外部流程中的步骤插入作业中,就像它们已被内联声明一样。这样,许多作业可以引用相同的模板流,并将这些模板组成不同的逻辑流。这也是分离单个流的集成测试的好方法。

外部化流程的另一种形式是使用JobStepA JobStep与a相似, FlowStep但实际上为指定流程中的步骤创建并启动单独的作业执行。

以下Java代码段显示了一个示例JobStep

Java配置
@Bean
public Job jobStepJob() {
	return this.jobBuilderFactory.get("jobStepJob")
				.start(jobStepJobStep1(null))
				.build();
}

@Bean
public Step jobStepJobStep1(JobLauncher jobLauncher) {
	return this.stepBuilderFactory.get("jobStepJobStep1")
				.job(job())
				.launcher(jobLauncher)
				.parametersExtractor(jobParametersExtractor())
				.build();
}

@Bean
public Job job() {
	return this.jobBuilderFactory.get("job")
				.start(step1())
				.build();
}

@Bean
public DefaultJobParametersExtractor jobParametersExtractor() {
	DefaultJobParametersExtractor extractor = new DefaultJobParametersExtractor();

	extractor.setKeys(new String[]{"input.file"});

	return extractor;
}

作业参数提取器是一种策略,用于确定将ExecutionContextfor Step转换为JobParametersfor的Job方式。JobStep当你想为监测和对就业和步骤报告一些更精细的选择是非常有用的。使用JobStep通常还可以很好地回答以下问题:“如何在作业之间创建依赖关系?” 这是将大型系统分解为较小的模块并控制作业流程的好方法。

5.4.JobStep属性的后期绑定

前面显示的XML和平面文件示例都使用Spring Resource抽象来获取文件。之所以有效,是因为Resource具有getFile方法,该方法返回 java.io.File可以使用标准Spring构造来配置XML和平面文件资源,如以下示例所示:

Java配置
@Bean
public FlatFileItemReader flatFileItemReader() {
	FlatFileItemReader<Foo> reader = new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource("file://outputs/file.txt"))
			...
}

前面Resource的内容从指定的文件系统位置加载文件。请注意,绝对位置必须以双斜杠(//开头在大多数Spring应用程序中,此解决方案足够好,因为这些资源的名称在编译时就已知。但是,在批处理方案中,可能需要在运行时将文件名确定为作业的参数。可以使用“ -D”参数读取系统属性来解决。

以下Java代码段显示了如何从属性读取文件名:

Java配置
@Bean
public FlatFileItemReader flatFileItemReader(@Value("${input.file.name}") String name) {
	return new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource(name))
			...
}

要使该解决方案有效,所需要做的只是一个系统参数(例如 -Dinput.file.name="file://outputs/file.txt")。

尽管PropertyPlaceholderConfigurer可以在此处使用a ,但是始终设置系统属性不是必需的,因为ResourceEditorSpring已经过滤并在系统属性上进行了占位符替换。

通常,在批处理设置中,最好JobParameters在作业的中参数化文件名 ,而不是通过系统属性来进行访问,并以这种方式进行访问。为了实现这一点,Spring Batch允许后期绑定各个JobStep属性,如以下代码片段所示:

Java配置
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters['input.file.name']}") String name) {
	return new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource(name))
			...
}

无论是JobExecutionStepExecution电平ExecutionContext可以以相同的方式被访问,如图所示在以下实施例:

Java配置
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.file.name']}") String name) {
	return new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource(name))
			...
}
Java配置
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{stepExecutionContext['input.file.name']}") String name) {
	return new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource(name))
			...
}

任何使用后期绑定的bean必须使用scope =“ step”声明。有关更多信息,请参见 步骤范围

5.4.1.步骤范围

上面的所有后期绑定示例在bean定义上都声明了一个“ step”范围,如以下示例所示:

Java配置
@StepScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input.file.name]}") String name) {
	return new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource(name))
			...
}

Step为了使用后期绑定,需要使用作用域,因为在Step启动之前实际上无法实例化Bean ,以允许找到属性。由于默认情况下它不是Spring容器的一部分,因此必须通过使用batch名称空间或通过显式包括的bean定义StepScope或使用@EnableBatchProcessing注释来显式添加范围仅使用其中一种方法。以下示例使用batch名称空间:

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:batch="http://www.springframework.org/schema/batch"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="...">
<batch:job .../>
...
</beans>

以下示例显式包括了bean定义:

<bean class="org.springframework.batch.core.scope.StepScope" />

5.4.2.工作范围

JobSpring Batch 3.0中引入的scope类似于Step配置中的scope,但是它是Job上下文的Scope ,因此每个正在运行的作业只有一个这样的bean实例。此外,还为后期绑定引用提供了支持,这些引用可从JobContext使用#{..}占位符的位置访问使用此功能,可以从作业或作业执行上下文以及作业参数中提取bean属性,如以下示例所示:

Java配置
@JobScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobParameters[input]}") String name) {
	return new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource(name))
			...
}
Java配置
@JobScope
@Bean
public FlatFileItemReader flatFileItemReader(@Value("#{jobExecutionContext['input.name']}") String name) {
	return new FlatFileItemReaderBuilder<Foo>()
			.name("flatFileItemReader")
			.resource(new FileSystemResource(name))
			...
}

因为默认情况下它不是Spring容器的一部分,所以必须通过使用batch名称空间,通过为JobScope显式包含bean定义或使用@EnableBatchProcessing批注(但不是全部)来显式添加范围以下示例使用batch名称空间:

<beans xmlns="http://www.springframework.org/schema/beans"
		  xmlns:batch="http://www.springframework.org/schema/batch"
		  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
		  xsi:schemaLocation="...">

<batch:job .../>
...
</beans>

以下示例包含一个显式定义的Bean JobScope

<bean class="org.springframework.batch.core.scope.JobScope" />

6. ItemReaders和ItemWriters

所有批处理都可以以其最简单的形式描述为读取大量数据,执行某种类型的计算或转换并写出结果。Spring Batch的提供帮助的三个关键接口执行批量读取和写入: ItemReaderItemProcessor,和ItemWriter

6.1. ItemReader

尽管是一个简单的概念,但是an ItemReader是从许多不同类型的输入中提供数据的方法。最一般的示例包括:

  • 平面文件:平面文件项目读取器从平面文件中读取数据行,该文件通常描述记录的数据字段由文件中的固定位置定义或由某些特殊字符(例如逗号)分隔。

  • XML:XML ItemReaders处理XML独立于用于解析,映射和验证对象的技术。输入数据允许针对XSD模式验证XML文件。

  • 数据库:访问数据库资源以返回结果集,该结果集可以映射到对象以进行处理。默认的SQL ItemReader实现调用a RowMapper 返回对象,如果需要重新启动,则跟踪当前行,存储基本统计信息,并提供一些事务增强功能,稍后将进行说明。

还有更多的可能性,但在本章中我们将重点介绍基本的可能性。附录A中提供了所有可用ItemReader实现的 完整列表

ItemReader 是通用输入操作的基本接口,如以下接口定义所示:

public interface ItemReader<T> {

    T read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException;

}

read方法定义的最基本合同ItemReader调用它返回一个项目,或者null如果没有更多的项目了。一个项目可能代表文件中的一行,数据库中的一行或XML文件中的元素。通常期望的是,这些被映射到可用的域对象(如TradeFoo或其它),但是没有在合同这样做没有任何要求。

预期该ItemReader接口的实现仅是转发的。但是,如果基础资源是事务性的(例如JMS队列),则read在回滚方案中,调用 可能会在后续调用中返回相同的逻辑项。还值得注意的是,缺少要处理的项目ItemReader不会导致引发异常。例如,ItemReader配置有返回0结果的查询的数据库null在第一次调用read时返回

6.2. ItemWriter

ItemWriter在功能上类似于ItemReader反操作。资源仍然需要定位,打开和关闭,但是它们的区别在于 ItemWriter写出而不是读入。对于数据库或队列,这些操作可能是插入,更新或发送。输出序列化的格式特定于每个批处理作业。

与一样ItemReaderItemWriter是一个相当通用的接口,如以下接口定义所示:

public interface ItemWriter<T> {

    void write(List<? extends T> items) throws Exception;

}

readon一样ItemReaderwrite提供的基本合同ItemWriter只要打开,它就会尝试写出传入项目的列表。因为通常期望将项目“分批”在一起,然后输出,所以接口接受项目列表,而不是项目本身。写入列表后,可以执行任何必要的刷新操作,然后再从write方法返回。例如,如果写入一个Hibernate DAO,则可以进行多个写入操作,每个项目一个。然后,编写者可以flush在返回之前调用休眠会话。

6.3. ItemProcessor

ItemReaderItemWriter接口都为他们的具体任务是非常有用的,但是如果你想要写之前插入商业逻辑是什么?读写的一种选择是使用复合模式:创建一个ItemWriter包含另一个的ItemWriter或一个ItemReader包含另一个的ItemReader以下代码显示了一个示例:

public class CompositeItemWriter<T> implements ItemWriter<T> {

    ItemWriter<T> itemWriter;

    public CompositeItemWriter(ItemWriter<T> itemWriter) {
        this.itemWriter = itemWriter;
    }

    public void write(List<? extends T> items) throws Exception {
        //Add business logic here
       itemWriter.write(items);
    }

    public void setDelegate(ItemWriter<T> itemWriter){
        this.itemWriter = itemWriter;
    }
}

上一类包含另一个ItemWriter,在提供了一些业务逻辑之后,它委托给它。该模式也可以轻松地用于ItemReader,也许可以基于main提供的输入来获取更多参考数据ItemReader如果您需要控制对write自己的呼叫,它也很有用但是,如果您只想在实际写入之前“转换”传递给写入的项目,则不需要您write自己。您可以修改项目。对于这种情况,Spring Batch提供了ItemProcessor接口,如以下接口定义所示:

public interface ItemProcessor<I, O> {

    O process(I item) throws Exception;
}

An ItemProcessor很简单。给定一个对象,对其进行转换,然后返回另一个。提供的对象可以是或可以不是相同的类型。关键是可以在流程中应用业务逻辑,并且完全由开发人员来创建该逻辑。一个ItemProcessor可直接连线到的步骤。例如,假设an ItemReader提供了一个类型类型,Foo并且Bar 在将其写出之前需要将其转换为type 以下示例显示了ItemProcessor执行转换的:

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class FooProcessor implements ItemProcessor<Foo,Bar>{
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarWriter implements ItemWriter<Bar>{
    public void write(List<? extends Bar> bars) throws Exception {
        //write bars
    }
}

在前面的示例中,有一个class Foo,一个class Bar和一个 FooProcessor遵守该ItemProcessor接口的类。转换很简单,但是任何类型的转换都可以在这里完成。BarWriterBar 对象,抛出一个异常,如果提供任何其他类型。同样,FooProcessor如果Foo提供除a以外的任何内容,则 引发异常所述 FooProcessor然后可以注入一个Step,如图以下示例:

Java配置
@Bean
public Job ioSampleJob() {
	return this.jobBuilderFactory.get("ioSampleJOb")
				.start(step1())
				.end()
				.build();
}

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(fooReader())
				.processor(fooProcessor())
				.writer(barWriter())
				.build();
}

6.3.1.链接项目处理器

执行单个转换在许多情况下很有用,但是如果要将多个ItemProcessor实现“链接”在一起怎么办?这可以使用前面提到的复合图案来完成。为了更新先前的示例,Foo单个转换示例转换为Bar,然后将其转换为Foobar 并写出,如以下示例所示:

public class Foo {}

public class Bar {
    public Bar(Foo foo) {}
}

public class Foobar {
    public Foobar(Bar bar) {}
}

public class FooProcessor implements ItemProcessor<Foo,Bar>{
    public Bar process(Foo foo) throws Exception {
        //Perform simple transformation, convert a Foo to a Bar
        return new Bar(foo);
    }
}

public class BarProcessor implements ItemProcessor<Bar,Foobar>{
    public Foobar process(Bar bar) throws Exception {
        return new Foobar(bar);
    }
}

public class FoobarWriter implements ItemWriter<Foobar>{
    public void write(List<? extends Foobar> items) throws Exception {
        //write items
    }
}

可以将A FooProcessor和a BarProcessor链接在一起以得到结果 Foobar,如以下示例所示:

CompositeItemProcessor<Foo,Foobar> compositeProcessor =
                                      new CompositeItemProcessor<Foo,Foobar>();
List itemProcessors = new ArrayList();
itemProcessors.add(new FooTransformer());
itemProcessors.add(new BarTransformer());
compositeProcessor.setDelegates(itemProcessors);

与前面的示例一样,可以将复合处理器配置为 Step

Java配置
@Bean
public Job ioSampleJob() {
	return this.jobBuilderFactory.get("ioSampleJob")
				.start(step1())
				.end()
				.build();
}

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(fooReader())
				.processor(compositeProcessor())
				.writer(foobarWriter())
				.build();
}

@Bean
public CompositeItemProcessor compositeProcessor() {
	List<ItemProcessor> delegates = new ArrayList<>(2);
	delegates.add(new FooProcessor());
	delegates.add(new BarProcessor());

	CompositeItemProcessor processor = new CompositeItemProcessor();

	processor.setDelegates(delegates);

	return processor;
}

6.3.2.筛选记录

项目处理器的一种典型用法是在将记录传递到记录之前过滤掉ItemWriter过滤是不同于跳过的动作。跳过表示一条记录无效,而过滤只是表明不应写入一条记录。

例如,考虑一个批处理作业,它读取一个包含三种不同类型记录的文件:要插入的记录,要更新的记录和要删除的记录。如果系统不支持删除记录,则我们不希望将任何“删除”记录发送到ItemWriter但是,由于这些记录实际上并不是不良记录,因此我们希望将其过滤掉而不是跳过它们。结果,ItemWriter只会收到“插入”和“更新”记录。

要过滤记录,您可以null返回ItemProcessor框架检测到结果为,null并避免将该项目添加到传递给的记录列表中ItemWriter与往常一样,从ItemProcessor结果中引发的异常将导致跳过。

6.3.3.容错能力

回滚块时,可能会重新处理读取过程中已缓存的项目。如果将某个步骤配置为容错的(通常通过使用跳过或重试处理),则ItemProcessor所使用的任何步骤都应以幂等的方式实施。通常,这包括对输入项不执行任何更改ItemProcessor,仅更新结果实例。

6.4. ItemStream

双方ItemReadersItemWriters满足他们的个人目的很好,但他们都认为必要的另一个接口之间的共同关注。通常,作为批处理作业范围的一部分,需要打开,关闭读取器和写入器,并需要一种持久化状态的机制。ItemStream接口用于此目的,如以下示例所示:

public interface ItemStream {

    void open(ExecutionContext executionContext) throws ItemStreamException;

    void update(ExecutionContext executionContext) throws ItemStreamException;

    void close() throws ItemStreamException;
}

在描述每种方法之前,我们应该提到ExecutionContextItemReader还实现的的客户端 ItemStream应在调用open之前先调用 read,以打开任何资源(例如文件)或获得连接。类似的限制适用于ItemWriter实现的ItemStream如第2章所述,如果在中找到了预期的数据ExecutionContext,则可以将其用于启动ItemReaderItemWriter在初始状态以外的其他位置。相反, close调用可以确保打开期间分配的所有资源都安全释放。 update主要是为了确保当前保持的任何状态都被加载到ExecutionContext在提交之前调用此方法,以确保在提交之前将当前状态保留在数据库中。

在特殊情况下,其中an的客户端ItemStream是a Step(来自Spring Batch Core),ExecutionContext则为每个StepExecution创建一个an,以允许用户存储特定执行的状态,并期望如果JobInstance再次启动该执行则返回该状态。对于那些熟悉Quartz的人来说,其语义与Quartz非常相似JobDataMap

6.5.委托模式和步骤注册

请注意,这CompositeItemWriter是委派模式的示例,这在Spring Batch中很常见。委托本身可以实现回调接口,例如StepListener如果他们这样做,如果结合正在使用Spring Batch的作为核心的一部分,他们StepJob,那么他们几乎肯定需要用手工登记Step直接连接到Step获取器中的读取器,写入器或处理器(如果实现ItemStreamStepListener接口)会自动注册但是,由于委托并不为人所知Step,因此需要将其作为侦听器或流(或在适当时将两者同时注入)注入,如以下示例所示:

Java配置
@Bean
public Job ioSampleJob() {
	return this.jobBuilderFactory.get("ioSampleJob")
				.start(step1())
				.end()
				.build();
}

@Bean
public Step step1() {
	return this.stepBuilderFactory.get("step1")
				.<String, String>chunk(2)
				.reader(fooReader())
				.processor(fooProcessor())
				.writer(compositeItemWriter())
				.stream(barWriter())
				.build();
}

@Bean
public CustomCompositeItemWriter compositeItemWriter() {

	CustomCompositeItemWriter writer = new CustomCompositeItemWriter();

	writer.setDelegate(barWriter());

	return writer;
}

@Bean
public BarWriter barWriter() {
	return new BarWriter();
}

6.6.平面文件

交换批量数据的最常见机制之一一直是平面文件。与XML具有定义其结构化(XSD)的商定标准不同,任何阅读平面文件的人必须提前了解文件的结构。通常,所有平面文件都分为两种:定界文件和定长文件。分隔文件是指用逗号分隔分隔符的字段。固定长度文件具有设置长度的字段。

6.6.1.FieldSet

在Spring Batch中使用平面文件时,无论是用于输入还是用于输出,最重要的类之一是FieldSet许多体系结构和库都包含抽象,以帮助您从文件中读取数据,但是它们通常返回一个StringString对象数组这真的只会让您半途而废。A FieldSet是Spring Batch的抽象,用于启用文件资源中字段的绑定。它使开发人员可以像处理数据库输入一样使用文件输入。FieldSet概念上与JDBC类似 ResultSetA FieldSet只需要一个参数:String令牌数组。(可选)您还可以配置字段的名称,以便可以按index或名称后面的方式访问这些字段ResultSet,如以下示例所示:

String[] tokens = new String[]{"foo", "1", "true"};
FieldSet fs = new DefaultFieldSet(tokens);
String name = fs.readString(0);
int value = fs.readInt(1);
boolean booleanValue = fs.readBoolean(2);

FieldSet界面上还有更多选项,例如Date,long BigDecimal和等等。的最大优点FieldSet是,它提供了平面文件输入的一致解析。在处理由格式异常引起的错误或进行简单的数据转换时,它可以保持一致,而不是使每个批处理作业以潜在的意外方式进行不同的解析。

6.6.2. FlatFileItemReader

平面文件是最多包含二维(表格)数据的任何类型的文件。名为类有助于在Spring Batch框架中读取平面文件,该类 FlatFileItemReader提供了用于读取和解析平面文件的基本功能。的最重要的两个必需的依赖FlatFileItemReaderResourceLineMapperLineMapper下一节将进一步探讨界面。resource属性代表一个Spring Core Resource可以在Spring Framework的第5章中找到说明如何创建这种类型的bean的文档 因此,Resource除了显示以下简单示例之外,本指南不介绍创建对象的详细信息

Resource resource = new FileSystemResource("resources/trades.csv");

在复杂的批处理环境中,目录结构通常由EAI基础结构管理,其中建立了用于外部接口的放置区,以将文件从FTP位置移动到批处理位置,反之亦然。文件移动实用程序超出了Spring Batch体系结构的范围,但是批处理作业流中包括文件移动实用程序作为作业流中的步骤并不少见。批处理体系结构只需要知道如何找到要处理的文件。Spring Batch从此起点开始将数据馈送到管道中的过程。但是, Spring Integration提供了许多此类服务。

下表中的其他属性FlatFileItemReader使您可以进一步指定如何解释数据:

表15. FlatFileItemReader属性
属性 类型 描述

注释

串[]

指定指示注释行的行前缀。

编码方式

指定要使用的文本编码。默认值为Charset.defaultCharset()

lineMapper

LineMapper

将转换StringObject代表项。

linesToSkip

整型

文件顶部要忽略的行数。

recordSeparatorPolicy

RecordSeparatorPolicy

用于确定行尾的位置,并执行诸如在带引号的字符串中继续到行尾的操作。

资源

Resource

从中读取资源。

skippedLinesCallback

LineCallbackHandler

传递要跳过的文件中各行的原始行内容的接口。如果linesToSkip设置为2,则此接口被调用两次。

严格

布尔值

在严格模式下,ExecutionContext如果输入资源不存在,则读取器将引发异常否则,它将记录问题并继续。

LineMapper

与一样RowMapper,它采用诸如之类的低级构造ResultSet并返回Object,平面文件处理需要相同的构造才能将String行转换为Object,如以下接口定义所示:

public interface LineMapper<T> {

    T mapLine(String line, int lineNumber) throws Exception;

}

基本约定是,给定当前行和与其关联的行号,映射器应返回结果域对象。这与相似 RowMapper,因为每一行都与其行号相关联,就像a中的每一行都 ResultSet与其行号相关联。这允许将行号绑定到结果域对象,以进行身份​​比较或提供更多信息。但是,与不同RowMapperLineMapper给了原始行,如上所述,原始行只会使您半途而废。该行必须标记为FieldSet,然后可以映射到对象,如本文档后面所述。

LineTokenizer

必须将输入行转换为a的抽象,FieldSet因为可能需要将多种格式的平面文件数据转换为FieldSet在Spring Batch中,此接口是LineTokenizer

public interface LineTokenizer {

    FieldSet tokenize(String line);

}

a的约定LineTokenizer是这样的:给定一行输入(理论上,它 String可以包含多条线),FieldSet则返回代表该行的a FieldSet可以被传递到FieldSetMapperSpring Batch包含以下LineTokenizer实现:

  • DelimitedLineTokenizer:用于记录中的字段由定界符分隔的文件。最常见的定界符是逗号,但是也经常使用竖线或分号。

  • FixedLengthTokenizer:用于记录中的字段均为“固定宽度”的文件。必须为每种记录类型定义每个字段的宽度。

  • PatternMatchingCompositeLineTokenizerLineTokenizer通过检查模式来确定应在特定行上使用的分词器列表中的一个。

FieldSetMapper

FieldSetMapper接口定义了一个方法,mapFieldSet方法接受一个 FieldSet对象并将其内容映射到一个对象。根据作业的需要,此对象可以是自定义DTO,域对象或数组。FieldSetMapper是在与结合使用LineTokenizer的线从资源数据的翻译成所需类型的对象,如示于下述的接口定义:

public interface FieldSetMapper<T> {

    T mapFieldSet(FieldSet fieldSet) throws BindException;

}

使用的模式与所RowMapper使用的模式相同JdbcTemplate

DefaultLineMapper

既然已经定义了读取平面文件的基本接口,那么很明显,需要三个基本步骤:

  1. 从文件中读取一行。

  2. String传递LineTokenizer#tokenize()方法中以检索 FieldSet

  3. FieldSet令牌化FieldSetMapper返回的结果传递,返回ItemReader#read()方法的结果

上面描述的两个接口代表两个单独的任务:将线转换为a FieldSet和将a映射FieldSet到域对象。因为a LineTokenizer的输入与LineMapper(a行)的输入匹配,并且a的输出与a FieldSetMapper的输出匹配LineMapper所以提供LineTokenizer同时使用a 和a 的默认实现FieldSetMapperDefaultLineMapper,在下面的类定义所示,代表的行为大多数用户需要:

public class DefaultLineMapper<T> implements LineMapper<>, InitializingBean {

    private LineTokenizer tokenizer;

    private FieldSetMapper<T> fieldSetMapper;

    public T mapLine(String line, int lineNumber) throws Exception {
        return fieldSetMapper.mapFieldSet(tokenizer.tokenize(line));
    }

    public void setLineTokenizer(LineTokenizer tokenizer) {
        this.tokenizer = tokenizer;
    }

    public void setFieldSetMapper(FieldSetMapper<T> fieldSetMapper) {
        this.fieldSetMapper = fieldSetMapper;
    }
}

以上功能是默认实现中提供的,而不是内置于阅读器本身中(如在框架的先前版本中所做的那样),以使用户在控制解析过程时具有更大的灵活性,尤其是在需要访问原始行的情况下。

简单的分隔文件读取示例

以下示例说明了如何使用实际域方案读取平面文件。这个特定的批处理作业从以下文件中读取足球运动员:

ID,姓氏,名字,位置,出生年份,首次亮相
“ AbduKa00,阿卜杜勒-贾巴尔,卡里姆,rb,1974年,1996年”,
“ AbduRa00,阿卜杜拉,拉比,Rb,1975,1999”,
“ AberWa00,Abercrombie,沃尔特,rb,1959年,1982年”,
“ AbraDa00,Abramowicz,丹尼,Wr,1945,1967”,
“ AdamBo00,亚当斯,鲍勃,te,1946,1969”,
“ AdamCh00,Adams,Charlie,wr,1979,2003”

该文件的内容映射到以下 Player域对象:

public class Player implements Serializable {

    private String ID;
    private String lastName;
    private String firstName;
    private String position;
    private int birthYear;
    private int debutYear;

    public String toString() {
        return "PLAYER:ID=" + ID + ",Last Name=" + lastName +
            ",First Name=" + firstName + ",Position=" + position +
            ",Birth Year=" + birthYear + ",DebutYear=" +
            debutYear;
    }

    // setters and getters...
}

要将a映射FieldSet到一个Player对象,FieldSetMapper需要定义一个返回玩家的a ,如以下示例所示:

protected static class PlayerFieldSetMapper implements FieldSetMapper<Player> {
    public Player mapFieldSet(FieldSet fieldSet) {
        Player player = new Player();

        player.setID(fieldSet.readString(0));
        player.setLastName(fieldSet.readString(1));
        player.setFirstName(fieldSet.readString(2));
        player.setPosition(fieldSet.readString(3));
        player.setBirthYear(fieldSet.readInt(4));
        player.setDebutYear(fieldSet.readInt(5));

        return player;
    }
}

然后可以通过正确构造a FlatFileItemReader并调用 来读取文件read,如以下示例所示:

FlatFileItemReader<Player> itemReader = new FlatFileItemReader<>();
itemReader.setResource(new FileSystemResource("resources/players.csv"));
//DelimitedLineTokenizer defaults to comma as its delimiter
DefaultLineMapper<Player> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(new DelimitedLineTokenizer());
lineMapper.setFieldSetMapper(new PlayerFieldSetMapper());
itemReader.setLineMapper(lineMapper);
itemReader.open(new ExecutionContext());
Player player = itemReader.read();

每次调用read都会Player从文件的每一行返回一个新 对象。到达文件末尾时,null将返回。

按名称映射字段

存在是受两个允许的功能的一个附加件 DelimitedLineTokenizerFixedLengthTokenizer,这是在功能上类似的JDBC ResultSet可以将字段名称注入这些LineTokenizer实现中的任何一种中, 以提高映射功能的可读性。首先,将平面文件中所有字段的列名注入令牌生成器,如以下示例所示:

tokenizer.setNames(new String[] {"ID", "lastName","firstName","position","birthYear","debutYear"});

A FieldSetMapper可以如下使用此信息:

public class PlayerMapper implements FieldSetMapper<Player> {
    public Player mapFieldSet(FieldSet fs) {

       if(fs == null){
           return null;
       }

       Player player = new Player();
       player.setID(fs.readString("ID"));
       player.setLastName(fs.readString("lastName"));
       player.setFirstName(fs.readString("firstName"));
       player.setPosition(fs.readString("position"));
       player.setDebutYear(fs.readInt("debutYear"));
       player.setBirthYear(fs.readInt("birthYear"));

       return player;
   }
}
将字段集自动映射到域对象

对于许多人来说,不必编写特定FieldSetMapper同样繁琐写一个特定RowMapperJdbcTemplateSpring Batch通过提供一个FieldSetMapper,通过使用JavaBean规范将字段名称与对象上的设置器进行匹配来自动映射字段,从而使此操作变得更加容易再次使用Football示例, BeanWrapperFieldSetMapper配置看起来像以下代码片段:

Java配置
@Bean
public FieldSetMapper fieldSetMapper() {
	BeanWrapperFieldSetMapper fieldSetMapper = new BeanWrapperFieldSetMapper();

	fieldSetMapper.setPrototypeBeanName("player");

	return fieldSetMapper;
}

@Bean
@Scope("prototype")
public Player player() {
	return new Player();
}

对于Map中的每个条目FieldSet,映射器都以Player与Spring容器查找与属性名称匹配的设置器相同的方式,在对象的新实例上寻找对应的设置器(由于这个原因,需要原型作用域)。FieldSet映射了中的每个可用字段,并Player返回了生成的对象,不需要任何代码。

固定长度文件格式

到目前为止,仅详细讨论了定界文件。但是,它们仅代表文件读取图片的一半。许多使用平面文件的组织使用固定长度格式。固定长度文件示例如下:

UK21341EAH4121131.11customer1
UK21341EAH4221232.11customer2
UK21341EAH4321333.11customer3
UK21341EAH4421434.11customer4
UK21341EAH4521535.11customer5

虽然这看起来像一个大字段,但实际上代表了4个不同的字段:

  1. ISIN:所订购商品的唯一标识符-12个字符长。

  2. 数量:所订购商品的数量-3个字符长。

  3. 价格:商品价格-5个字符长。

  4. 客户:订购商品的客户的ID-9个字符长。

配置时FixedLengthLineTokenizer,必须以范围的形式提供每个长度,如以下示例所示:

要支持上述范围语法,需要RangeArrayPropertyEditor在中配置一个专门的属性编辑器 ApplicationContext但是,将ApplicationContext在使用批处理名称空间位置自动声明此bean

Java配置
@Bean
public FixedLengthTokenizer fixedLengthTokenizer() {
	FixedLengthTokenizer tokenizer = new FixedLengthTokenizer();

	tokenizer.setNames("ISIN", "Quantity", "Price", "Customer");
	tokenizer.setColumns(new Range(1-12),
						new Range(13-15),
						new Range(16-20),
						new Range(21-29));

	return tokenizer;
}

因为FixedLengthLineTokenizer使用与LineTokenizer上面讨论的相同的接口,所以它返回的结果与FieldSet使用分隔符相同这样就可以使用相同的方法来处理其输出,例如使用 BeanWrapperFieldSetMapper

单个文件中的多种记录类型

到目前为止,为简单起见,所有文件读取示例都做出了关键假设:文件中的所有记录都具有相同的格式。但是,并非总是如此。通常,文件中的记录可能具有不同的格式,需要对其进行不同的标记和映射到不同的对象。以下文件摘录对此进行了说明:

USER; Smith; Peter ;; T; 20014539; F
线; 1044391041ABC037.49G201XX1383.12H
LINEB; 2134776319DEF422.99M005LI

在此文件中,我们有三种类型的记录:“ USER”,“ LINEA”和“ LINEB”。“ USER”行对应于一个User对象。Line尽管“ LINEA”比“ LINEB”具有更多信息,但“ LINEA”和“ LINEB”都对应于对象。

ItemReader单独读取每个行,但我们必须指定不同 LineTokenizerFieldSetMapper对象,以便ItemWriter接收正确的项目。PatternMatchingCompositeLineMapper通过允许的模式来映射使得这个容易LineTokenizer实例和图案FieldSetMapper实例进行配置,如图中下面的例子:

Java配置
@Bean
public PatternMatchingCompositeLineMapper orderFileLineMapper() {
	PatternMatchingCompositeLineMapper lineMapper =
		new PatternMatchingCompositeLineMapper();

	Map<String, LineTokenizer> tokenizers = new HashMap<>(3);
	tokenizers.put("USER*", userTokenizer());
	tokenizers.put("LINEA*", lineATokenizer());
	tokenizers.put("LINEB*", lineBTokenizer());

	lineMapper.setTokenizers(tokenizers);

	Map<String, FieldSetMapper> mappers = new HashMap<>(2);
	mappers.put("USER*", userFieldSetMapper());
	mappers.put("LINE*", lineFieldSetMapper());

	lineMapper.setFieldSetMappers(mappers);

	return lineMapper;
}

在此示例中,“ LINEA”和“ LINEB”具有单独的LineTokenizer实例,但是它们都使用相同的FieldSetMapper

PatternMatchingCompositeLineMapper使用PatternMatcher#match,以便选择每行的正确委托方法。PatternMatcher允许两个通配符具有特殊含义:问号(“?”)匹配一个字符,而星号(“*”)匹配零个或多个字符。请注意,在上述配置中,所有模式都以星号结尾,从而使它们有效地成为行的前缀。PatternMatcher总是匹配最具体的模式可能,无论在配置的顺序。因此,如果将“ LINE *”和“ LINEA *”都列为模式,则“ LINEA”将与模式“ LINEA *”匹配,而“ LINEB”将与模式“ LINE *”匹配。此外,单个星号(“ *”

Java配置
...
tokenizers.put("*", defaultLineTokenizer());
...

还有一个PatternMatchingCompositeLineTokenizer可单独用于标记化。

平面文件包含每个跨越多行的记录也是很常见的。为了处理这种情况,需要更复杂的策略。multiLineRecords样本中可以找到这种常见模式的演示

平面文件中的异常处理

在许多情况下,对行进行标记可能会引发异常。许多平面文件是不完善的,并且包含格式错误的记录。许多用户选择在记录问题,原始行和行号时跳过这些错误的行。以后可以手动或通过其他批处理作业检查这些日志。因此,Spring Batch提供了用于处理解析异常的异常层次结构: FlatFileParseExceptionFlatFileFormatException尝试读取文件时遇到任何错误时FlatFileParseException,将引发FlatFileItemReaderFlatFileFormatExceptionLineTokenizer 接口的实现抛出,指示令牌化时遇到的更具体的错误。

IncorrectTokenCountException

双方DelimitedLineTokenizerFixedLengthLineTokenizer必须指定可用于创建一个列名的能力FieldSet但是,如果列名的数量与对行进行标记时找到的列数不匹配,FieldSet 则无法创建,并IncorrectTokenCountException引发an ,其中包含遇到的令牌数和预期的数,如以下示例所示。 :

tokenizer.setNames(new String[] {"A", "B", "C", "D"});

try {
    tokenizer.tokenize("a,b,c");
}
catch(IncorrectTokenCountException e){
    assertEquals(4, e.getExpectedCount());
    assertEquals(3, e.getActualCount());
}

因为令牌化程序配置了4个列名,但是在文件中仅找到3个令牌,所以IncorrectTokenCountException抛出。

IncorrectLineLengthException

解析为固定长度格式的文件在解析时还有其他要求,因为与分隔格式不同,每一列必须严格遵守其预定义宽度。如果总行长不等于此列的最宽值,则将引发异常,如以下示例所示:

tokenizer.setColumns(new Range[] { new Range(1, 5),
                                   new Range(6, 10),
                                   new Range(11, 15) });
try {
    tokenizer.tokenize("12345");
    fail("Expected IncorrectLineLengthException");
}
catch (IncorrectLineLengthException ex) {
    assertEquals(15, ex.getExpectedLength());
    assertEquals(5, ex.getActualLength());
}

上面标记器的配置范围是:1-5、6-10和11-15.因此,该行的总长度为15.但是,在前面的示例中,传入了长度为5的行,从而IncorrectLineLengthException引发了。在这里抛出异常,而不是仅映射第一列,可以使行的处理更早地失败,并且所包含的信息比如果尝试读取a的第2列失败时所包含的信息更多FieldSetMapper但是,在某些情况下,线的长度并不总是恒定的。因此,可以通过'strict'属性关闭行长的验证,如以下示例所示:

tokenizer.setColumns(new Range[] { new Range(1, 5), new Range(6, 10) });
tokenizer.setStrict(false);
FieldSet tokens = tokenizer.tokenize("12345");
assertEquals("12345", tokens.readString(0));
assertEquals("", tokens.readString(1));

前面的示例与之前的示例几乎完全相同,只是 tokenizer.setStrict(false)被调用了。此设置告诉令牌化程序在对行进行令牌化时不要强制行长。FieldSet现在已正确创建并返回A。但是,对于其余值,它仅包含空标记。

6.6.3. FlatFileItemWriter

写入平面文件具有必须解决的相同问题。步骤必须能够以事务方式编写定界或定长格式。

LineAggregator

正如LineTokenizer需要一个接口将一个项目并将其转换为一样 String,文件写入必须具有一种将多个字段聚合到单个字符串中以写入文件的方法。在Spring Batch中,这是LineAggregator,显示在以下接口定义中:

public interface LineAggregator<T> {

    public String aggregate(T item);

}

LineAggregator是的逻辑相反LineTokenizerLineTokenizer接受 String并返回a FieldSet,而LineAggregator接受item并返回a String

PassThroughLineAggregator

LineAggregator接口的最基本实现PassThroughLineAggregator,它假定对象已经是一个字符串或其字符串表示形式可以接受编写,如以下代码所示:

public class PassThroughLineAggregator<T> implements LineAggregator<T> {

    public String aggregate(T item) {
        return item.toString();
    }
}

如果需要直接控制创建字符串,但是必须具有a的优点(FlatFileItemWriter例如事务和重新启动支持),则上述实现非常有用

简化文件编写示例

现在已经定义LineAggregator接口及其最基本的实现, PassThroughLineAggregator下面可以解释基本的编写流程:

  1. 要写入的对象传递给以LineAggregator获得 String

  2. 返回的String内容将写入配置的文件。

以下摘录是FlatFileItemWriter用代码表示的:

public void write(T item) throws Exception {
    write(lineAggregator.aggregate(item) + LINE_SEPARATOR);
}

一个简单的配置可能如下所示:

Java配置
@Bean
public FlatFileItemWriter itemWriter() {
	return  new FlatFileItemWriterBuilder<Foo>()
           			.name("itemWriter")
           			.resource(new FileSystemResource("target/test-outputs/output.txt"))
           			.lineAggregator(new PassThroughLineAggregator<>())
           			.build();
}
FieldExtractor

前面的示例对于写入文件的最基本用途可能很有用。但是,的大多数用户FlatFileItemWriter都有一个需要写出的域对象,因此必须将其转换为一行。在文件读取中,需要以下内容:

  1. 从文件中读取一行。

  2. 将行传递到LineTokenizer#tokenize()方法中,以检索 FieldSet

  3. FieldSet令牌化FieldSetMapper返回的结果传递,返回ItemReader#read()方法的结果

文件写入具有相似但相反的步骤:

  1. 将要写入的项目传递给作家。

  2. 将项目上的字段转换为数组。

  3. 将结果数组聚合为一行。

因为框架没有办法知道需要写出对象的哪些字段,所以FieldExtractor必须编写一个,以完成将项目变成数组的任务,如以下接口定义所示:

public interface FieldExtractor<T> {

    Object[] extract(T item);

}

FieldExtractor接口的实现应从提供的对象的字段中创建一个数组,然后可以使用元素之间的定界符或将其作为固定宽度的行的一部分来写出该数组。

PassThroughFieldExtractor

在许多情况下,需要写出集合(例如数组Collection,或)FieldSet从这些集合类型之一中“提取”数组非常简单。为此,请将集合转换为数组。因此, PassThroughFieldExtractor在这种情况下应使用。应该注意的是,如果传入的对象不是集合的类型,则PassThroughFieldExtractor 返回一个仅包含要提取的项目的数组。

BeanWrapperFieldExtractor

BeanWrapperFieldSetMapper文件读取部分所述,通常最好配置如何将域对象转换为对象数组,而不是自己编写转换。BeanWrapperFieldExtractor提供了此功能,如图以下示例:

BeanWrapperFieldExtractor<Name> extractor = new BeanWrapperFieldExtractor<>();
extractor.setNames(new String[] { "first", "last", "born" });

String first = "Alan";
String last = "Turing";
int born = 1912;

Name n = new Name(first, last, born);
Object[] values = extractor.extract(n);

assertEquals(first, values[0]);
assertEquals(last, values[1]);
assertEquals(born, values[2]);

此提取器实现只有一个必需的属性:要映射的字段的名称。就像BeanWrapperFieldSetMapper需求字段名称映射FieldSet到提供的对象上的setter上需求字段一样 BeanWrapperFieldExtractor需求名称映射到用于创建对象数组的getters。值得注意的是,名称的顺序决定了数组中字段的顺序。

分隔文件编写示例

最基本的平面文件格式是其中所有字段都由定界符分隔的格式。可以使用来完成DelimitedLineAggregator下面的示例写出一个简单的域对象,该对象代表客户帐户的贷方:

public class CustomerCredit {

    private int id;
    private String name;
    private BigDecimal credit;

    //getters and setters removed for clarity
}

由于正在使用域对象,因此FieldExtractor 必须提供接口的实现以及要使用的分隔符,如以下示例所示:

Java配置
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
	BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
	fieldExtractor.setNames(new String[] {"name", "credit"});
	fieldExtractor.afterPropertiesSet();

	DelimitedLineAggregator<CustomerCredit> lineAggregator = new DelimitedLineAggregator<>();
	lineAggregator.setDelimiter(",");
	lineAggregator.setFieldExtractor(fieldExtractor);

	return new FlatFileItemWriterBuilder<CustomerCredit>()
				.name("customerCreditWriter")
				.resource(outputResource)
				.lineAggregator(lineAggregator)
				.build();
}

在前面的示例中,BeanWrapperFieldExtractor本章前面的内容用于将名称和贷方字段CustomerCredit转换为对象数组,然后用每个字段之间的逗号将其写出。

也可以使用FlatFileItemWriterBuilder.DelimitedBuilder来自动创建BeanWrapperFieldExtractorDelimitedLineAggregator ,如以下示例所示:

Java配置
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
	return new FlatFileItemWriterBuilder<CustomerCredit>()
				.name("customerCreditWriter")
				.resource(outputResource)
				.delimited()
				.delimiter("|")
				.names(new String[] {"name", "credit"})
				.build();
}
定宽文件写入示例

分隔不是平面文件格式的唯一类型。许多人更喜欢为每个列使用固定宽度来在字段之间划定轮廓,这通常称为“固定宽度”。Spring Batch支持使用FormatterLineAggregator使用上述相同的CustomerCredit域对象,可以将其配置如下:

Java配置
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
	BeanWrapperFieldExtractor<CustomerCredit> fieldExtractor = new BeanWrapperFieldExtractor<>();
	fieldExtractor.setNames(new String[] {"name", "credit"});
	fieldExtractor.afterPropertiesSet();

	FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
	lineAggregator.setFormat("%-9s%-2.0f");
	lineAggregator.setFieldExtractor(fieldExtractor);

	return new FlatFileItemWriterBuilder<CustomerCredit>()
				.name("customerCreditWriter")
				.resource(outputResource)
				.lineAggregator(lineAggregator)
				.build();
}

前面的大多数示例应该看起来很熟悉。但是,format属性的值是new,并显示在以下元素中:

...
FormatterLineAggregator<CustomerCredit> lineAggregator = new FormatterLineAggregator<>();
lineAggregator.setFormat("%-9s%-2.0f");
...

基础实现是使用与FormatterJava 5 相同的功能构建 的。Java Formatter基于printfC编程语言功能。有关如何配置格式化程序的大多数详细信息,可以在Formatter的Javadoc中找到

也可以使用FlatFileItemWriterBuilder.FormattedBuilder来自动创建BeanWrapperFieldExtractorFormatterLineAggregator ,如以下示例所示:

Java配置
@Bean
public FlatFileItemWriter<CustomerCredit> itemWriter(Resource outputResource) throws Exception {
	return new FlatFileItemWriterBuilder<CustomerCredit>()
				.name("customerCreditWriter")
				.resource(outputResource)
				.formatted()
				.format("%-9s%-2.0f")
				.names(new String[] {"name", "credit"})
				.build();
}
处理文件创建

FlatFileItemReader与文件资源有非常简单的关系。初始化阅读器后,它将打开文件(如果存在),如果没有,则引发异常。文件写入不是那么简单。乍一看,似乎应该存在类似的简单约定FlatFileItemWriter:如果文件已经存在,则引发异常;如果文件不存在,则创建它并开始写入。但是,可能重新启动Job可能会导致问题。在正常的重新启动方案中,合同是相反的:如果文件存在,请从最后一个已知的好的位置开始对其进行写入,如果不存在,则引发异常。但是,如果此作业的文件名始终相同会怎样?在这种情况下,您希望删除该文件(如果存在),除非重新启动。由于这种可能性,FlatFileItemWriter 包含属性shouldDeleteIfExists将此属性设置为true会导致在打开编写器时删除具有相同名称的现有文件。

6.7.XML项目读取器和写入器

Spring Batch提供了用于读取XML记录并将其映射到Java对象以及将Java对象编写为XML记录的事务性基础结构。

流XML的约束

StAX API用于I / O,因为其他标准XML解析API不能满足批处理要求(DOM一次将整个输入加载到内存中,而SAX通过允许用户仅提供回调来控制解析过程)。

我们需要考虑在Spring Batch中XML输入和输出如何工作。首先,有一些概念与文件读写不同,但在Spring Batch XML处理中很常见。通过XML处理,而不是FieldSet需要标记的记录行(实例),它假定XML资源是与各个记录相对应的“片段”的集合,如下图所示:

XML输入
图17. XML输入

在上述方案中,“贸易”标签被定义为“根元素”。'<trade>'和'</ trade>'之间的所有内容都被视为一个“片段”。Spring Batch使用对象/ XML映射(OXM)将片段绑定到对象。但是,Spring Batch不与任何特定的XML绑定技术绑定。典型的用途是委托 Spring OXM,它为最流行的OXM技术提供统一的抽象。对Spring OXM的依赖关系是可选的,如果需要,您可以选择实现特定于Spring Batch的接口。下图显示了与OXM支持的技术的关系:

OXM绑定
图18. OXM绑定

通过对OXM的介绍以及如何使用XML片段表示记录,我们现在可以更仔细地检查读者和作家。

6.7.1. StaxEventItemReader

StaxEventItemReader配置提供了用于处理XML输入流中的记录的典型设置。首先,考虑StaxEventItemReader可以处理的以下XML记录集

<?xml version="1.0" encoding="UTF-8"?>
<records>
    <trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0001</isin>
        <quantity>5</quantity>
        <price>11.39</price>
        <customer>Customer1</customer>
    </trade>
    <trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0002</isin>
        <quantity>2</quantity>
        <price>72.99</price>
        <customer>Customer2c</customer>
    </trade>
    <trade xmlns="https://springframework.org/batch/sample/io/oxm/domain">
        <isin>XYZ0003</isin>
        <quantity>9</quantity>
        <price>99.99</price>
        <customer>Customer3</customer>
    </trade>
</records>

为了能够处理XML记录,需要满足以下条件:

  • 根元素名称:构成要映射的对象的片段的根元素的名称。示例配置通过贸易价值展示了这一点。

  • 资源:表示要读取的文件的Spring资源。

  • Unmarshaller:Spring OXM提供的解组工具,用于将XML片段映射到对象。

以下示例说明了如何定义StaxEventItemReader与名为trade,的资源org/springframework/batch/item/xml/domain/trades.xml和称为的解组器的根元素一起使用的tradeMarshaller

Java配置
@Bean
public StaxEventItemReader itemReader() {
	return new StaxEventItemReaderBuilder<Trade>()
			.name("itemReader")
			.resource(new FileSystemResource("org/springframework/batch/item/xml/domain/trades.xml"))
			.addFragmentRootElements("trade")
			.unmarshaller(tradeMarshaller())
			.build();

}

请注意,在此示例中,我们选择使用XStreamMarshaller,接受接受作为映射传入的别名,该别名的第一个键和值是片段的名称(即根元素)和要绑定的对象类型。然后,类似于a FieldSet,映射到对象类型内字段的其他元素的名称在映射中描述为键/值对。在配置文件中,我们可以使用Spring配置实用程序来描述所需的别名,如下所示:

Java配置
@Bean
public XStreamMarshaller tradeMarshaller() {
	Map<String, Class> aliases = new HashMap<>();
	aliases.put("trade", Trade.class);
	aliases.put("price", BigDecimal.class);
	aliases.put("isin", String.class);
	aliases.put("customer", String.class);
	aliases.put("quantity", Long.class);

	XStreamMarshaller marshaller = new XStreamMarshaller();

	marshaller.setAliases(aliases);

	return marshaller;
}

在输入时,阅读器读取XML资源,直到它识别出一个新的片段即将开始。默认情况下,阅读器将匹配元素名称以识别新片段即将开始。读取器从该片段创建一个独立的XML文档,并将该文档传递给解串器(通常是Spring OXM的包装器Unmarshaller),以将XML映射到Java对象。

总之,此过程类似于以下Java代码,该代码使用Spring配置提供的注入:

StaxEventItemReader<Trade> xmlStaxEventItemReader = new StaxEventItemReader<>();
Resource resource = new ByteArrayResource(xmlResource.getBytes());

Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
XStreamMarshaller unmarshaller = new XStreamMarshaller();
unmarshaller.setAliases(aliases);
xmlStaxEventItemReader.setUnmarshaller(unmarshaller);
xmlStaxEventItemReader.setResource(resource);
xmlStaxEventItemReader.setFragmentRootElementName("trade");
xmlStaxEventItemReader.open(new ExecutionContext());

boolean hasNext = true;

Trade trade = null;

while (hasNext) {
    trade = xmlStaxEventItemReader.read();
    if (trade == null) {
        hasNext = false;
    }
    else {
        System.out.println(trade);
    }
}

6.7.2. StaxEventItemWriter

输出与输入对称地工作。StaxEventItemWriter需要Resource,一个编组,和rootTagName将Java对象传递给编组器(通常是标准的Spring OXM Marshaller),该编组Resource器使用自定义事件编写器写入,以过滤OXM工具为每个片段生成StartDocumentEndDocument事件。以下示例使用 StaxEventItemWriter

Java配置
@Bean
public StaxEventItemWriter itemWriter(Resource outputResource) {
	return new StaxEventItemWriterBuilder<Trade>()
			.name("tradesWriter")
			.marshaller(tradeMarshaller())
			.resource(outputResource)
			.rootTagName("trade")
			.overwriteOutput(true)
			.build();

}

前面的配置设置了三个必需的属性,并设置了可选 overwriteOutput=true属性,本章前面提到的用于指定是否可以覆盖现有文件。应当注意,以下示例中用于编写程序的编组器与本章前面的阅读示例中使用的编组器完全相同:

Java配置
@Bean
public XStreamMarshaller customerCreditMarshaller() {
	XStreamMarshaller marshaller = new XStreamMarshaller();

	Map<String, Class> aliases = new HashMap<>();
	aliases.put("trade", Trade.class);
	aliases.put("price", BigDecimal.class);
	aliases.put("isin", String.class);
	aliases.put("customer", String.class);
	aliases.put("quantity", Long.class);

	marshaller.setAliases(aliases);

	return marshaller;
}

总结一下Java示例,以下代码说明了所有讨论的要点,展示了所需属性的编程设置:

FileSystemResource resource = new FileSystemResource("data/outputFile.xml")

Map aliases = new HashMap();
aliases.put("trade","org.springframework.batch.sample.domain.trade.Trade");
aliases.put("price","java.math.BigDecimal");
aliases.put("customer","java.lang.String");
aliases.put("isin","java.lang.String");
aliases.put("quantity","java.lang.Long");
Marshaller marshaller = new XStreamMarshaller();
marshaller.setAliases(aliases);

StaxEventItemWriter staxItemWriter =
	new StaxEventItemWriterBuilder<Trade>()
				.name("tradesWriter")
				.marshaller(marshaller)
				.resource(resource)
				.rootTagName("trade")
				.overwriteOutput(true)
				.build();

staxItemWriter.afterPropertiesSet();

ExecutionContext executionContext = new ExecutionContext();
staxItemWriter.open(executionContext);
Trade trade = new Trade();
trade.setPrice(11.39);
trade.setIsin("XYZ0001");
trade.setQuantity(5L);
trade.setCustomer("Customer1");
staxItemWriter.write(trade);

6.8.JSON项目读取器和写入器

Spring Batch提供了以下格式的读取和写入JSON资源的支持:

[
  {
    "isin": "123",
    "quantity": 1,
    "price": 1.2,
    "customer": "foo"
  },
  {
    "isin": "456",
    "quantity": 2,
    "price": 1.4,
    "customer": "bar"
  }
]

假定JSON资源是与各个项目相对应的JSON对象数组。Spring Batch未绑定到任何特定的JSON库。

6.8.1. JsonItemReader

JsonItemReader代表JSON解析,并结合中实现 org.springframework.batch.item.json.JsonObjectReader接口。该接口旨在通过使用流API读取大块的JSON对象来实现。当前提供了两种实现:

  • 杰克逊通过org.springframework.batch.item.json.JacksonJsonObjectReader

  • 格森通过org.springframework.batch.item.json.GsonJsonObjectReader

为了能够处理JSON记录,需要满足以下条件:

  • Resource:一个Spring Resource,代表要读取的JSON文件。

  • JsonObjectReader:JSON对象读取器,用于将JSON对象解析并绑定到项目

以下示例显示了如何定义一个JsonItemReader与先前的JSON资源一起使用的org/springframework/batch/item/json/trades.jsonJsonObjectReader基于Jackson的一个:

@Bean
public JsonItemReader<Trade> jsonItemReader() {
   return new JsonItemReaderBuilder<Trade>()
                 .jsonObjectReader(new JacksonJsonObjectReader<>(Trade.class))
                 .resource(new ClassPathResource("trades.json"))
                 .name("tradeJsonItemReader")
                 .build();
}

6.8.2. JsonFileItemWriter

JsonFileItemWriter项目的编组委派到 org.springframework.batch.item.json.JsonObjectMarshaller接口。该接口的约定是获取一个对象并将其编组为JSON String当前提供了两种实现:

  • 杰克逊通过org.springframework.batch.item.json.JacksonJsonObjectMarshaller

  • 格森通过org.springframework.batch.item.json.GsonJsonObjectMarshaller

为了能够写入JSON记录,需要满足以下条件:

  • ResourceResource代表要写入的JSON文件的Spring

  • JsonObjectMarshaller:将对象编组为JSON格式的JSON对象编组器

以下示例显示了如何定义JsonFileItemWriter

@Bean
public JsonFileItemWriter<Trade> jsonFileItemWriter() {
   return new JsonFileItemWriterBuilder<Trade>()
                 .jsonObjectMarshaller(new JacksonJsonObjectMarshaller<>())
                 .resource(new ClassPathResource("trades.json"))
                 .name("tradeJsonFileItemWriter")
                 .build();
}

6.9.多文件输入

通常在一个文件中处理多个文件Step假设所有文件的格式相同,则MultiResourceItemReaderXML和平面文件处理都支持这种类型的输入。考虑目录中的以下文件:

file-1.txt file-2.txt被忽略.txt

file-1.txtfile-2.txt格式化相同的,出于商业原因,应一起处理。MultiResourceItemReader可用于在两个文件中读出由使用通配符,如图以下示例:

Java配置
@Bean
public MultiResourceItemReader multiResourceReader() {
	return new MultiResourceItemReaderBuilder<Foo>()
					.delegate(flatFileItemReader())
					.resources(resources())
					.build();
}

引用的委托很简单FlatFileItemReader上面的配置从两个文件读取输入,处理回滚和重新启动方案。应该注意的是,与任何其他方法一样ItemReader,添加额外的输入(在这种情况下为文件)可能会在重新启动时引起潜在的问题。建议批处理作业使用其各自的目录,直到成功完成为止。

通过使用输入资源进行排序,MultiResourceItemReader#setComparator(Comparator) 以确保在重新启动方案中在作业运行之间保留资源排序。

6.10.数据库

像大多数企业应用程序样式一样,数据库是批处理的中央存储机制。但是,批处理与其他应用程序样式不同,这是由于系统必须使用的数据集的绝对大小。如果一条SQL语句返回100万行,则结果集可能将所有返回的结果保存在内存中,直到读取了所有行。Spring Batch针对此问题提供了两种类型的解决方案:

6.10.1.基于游标的ItemReader实现

通常,使用数据库游标是大多数批处理开发人员的默认方法,因为它是数据库解决“流式”关系数据问题的解决方案。Java ResultSet类实质上是一种用于操纵游标的面向对象的机制。A ResultSet将光标保持在当前数据行。调用nextResultSet移动此光标到下一行。基于Spring Batch游标的ItemReader 实现会在初始化时打开游标,并在每次调用时将游标向前移动一行read,返回一个可用于处理的映射对象。close然后调用方法以确保释放所有资源。Spring核心 JdbcTemplate通过使用回调模式来完全映射一个对象中的所有行来解决此问题。ResultSet并在将控制权返回给方法调用者之前关闭。但是,必须分批进行,直到步骤完成。下图显示了基于游标的ItemReader工作方式的一般示意图请注意,尽管示例使用SQL(因为SQL众所周知),但是任何技术都可以实现基本方法。

光标示例
图19.游标示例

本示例说明了基本模式。给定一个'FOO'表,该表具有三列: IDNAMEBAR,选择ID大于1但小于7的所有行。这会将光标的开始(行1)放在ID 2上。该行的结果应该是一个完全映射的Foo对象。read()再次调用将光标移动到FooID为3 的下一行。这些读取的结果在每个之后写入 read,从而可以垃圾回收对象(假设没有实例变量维护对其的引用)。

JdbcCursorItemReader

JdbcCursorItemReader是基于游标的技术的JDBC实现。它直接与a配合使用,ResultSet并且需要SQL语句针对从a获得的连接运行DataSource以以下数据库架构为例:

CREATE TABLE CUSTOMER (
   ID BIGINT IDENTITY PRIMARY KEY,
   NAME VARCHAR(45),
   CREDIT FLOAT
);

许多人喜欢为每行使用一个域对象,因此以下示例使用RowMapper接口的实现来映射CustomerCredit对象:

public class CustomerCreditRowMapper implements RowMapper<CustomerCredit> {

    public static final String ID_COLUMN = "id";
    public static final String NAME_COLUMN = "name";
    public static final String CREDIT_COLUMN = "credit";

    public CustomerCredit mapRow(ResultSet rs, int rowNum) throws SQLException {
        CustomerCredit customerCredit = new CustomerCredit();

        customerCredit.setId(rs.getInt(ID_COLUMN));
        customerCredit.setName(rs.getString(NAME_COLUMN));
        customerCredit.setCredit(rs.getBigDecimal(CREDIT_COLUMN));

        return customerCredit;
    }
}

由于JdbcCursorItemReader与共享关键接口JdbcTemplate,因此有一个例子可以用来与之JdbcTemplate对比来读取该数据ItemReader就本示例而言,假设CUSTOMER数据库中有1,000行第一个示例使用JdbcTemplate

//For simplicity sake, assume a dataSource has already been obtained
JdbcTemplate jdbcTemplate = new JdbcTemplate(dataSource);
List customerCredits = jdbcTemplate.query("SELECT ID, NAME, CREDIT from CUSTOMER",
                                          new CustomerCreditRowMapper());

运行前面的代码段后,该customerCredits列表包含1,000个 CustomerCredit对象。在query方法中,从获取连接 DataSource,对它运行提供的SQL,并mapRow为中的每一行调用方法ResultSet将此与的方法进行对比 JdbcCursorItemReader,如以下示例所示:

JdbcCursorItemReader itemReader = new JdbcCursorItemReader();
itemReader.setDataSource(dataSource);
itemReader.setSql("SELECT ID, NAME, CREDIT from CUSTOMER");
itemReader.setRowMapper(new CustomerCreditRowMapper());
int counter = 0;
ExecutionContext executionContext = new Exec