前言 该文档是我在团队中负责集成测试模块时为团队成员编写的 API 接口文档,这次拿过来修改了一下,并已经将业务无关的代码剥离出来上传到 GitHub  上。主要是向大家展示一种流畅接口设计框架代码的模式。
所谓流畅接口,可以参考我之前翻译的 Martin Flower 的博客译文 。简单来说,流畅接口被设计为可读的 和流式的 ,使用起来几乎和自然语言一般流畅,并且配合 IDE 的智能提示,易于 API 使用者的理解和使用。譬如下面我编写的集成测试用例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public  class  InboundToOutboundExampleJobTest  extends  JobIntegrationTest      @Test      public  void  should_send_ACTT_and_receive_trigger_event_from_kafka ()           kafkaSuite.send(new  ResourceFile("sample/send/pek/ACTT.xml" ))                 .toTopic()                 .ofConfig("inbound_imf" );         SubjectEvent event = kafkaSuite                 .await().latestOne()                 .fromTopic()                 .ofConfig("trigger" )                 .toJavaObject(SubjectEvent.class);         assertThat(event.getEventName()).isEqualTo("FLOP_ACTT" );         assertThat(event.getGids().size()).isEqualTo(1 );         assertThat(event.getGids().get(0 )).isEqualTo("PEK_80664162170310656" );     } } 
代码读起来非常明了,KafkaSuite 是我编写的 Kafka 集成测试套件,负责将 xml 文件发送至 Kafka 的一个 Topic 中,该 Topic 配置在配置文件中,对应着 inbound_imf 项。同时 KafkaSuite 监听另一个 Topic trigger 的最新一条消息并转换成 Java 对象,对结果进行验证(其间过程 xml 文件会被其他模块解析并经过一系列的业务处理)。
上述代码中,assertThat().isEqualTo() 就是断言框架 assertj 的 API,它是一个非常优秀的流畅接口的框架,建议大家阅读其源代码。我在编写 Kafka 集成测试框架时就借鉴了这种编码模式。
集成测试 代码目前完成了 Kafka 集成测试的发送和监听功能。我们的流处理框架采用的是 Flink,将任务分解为一个个 Job 独立运行,所以集成测试首先得启动 Job,并且加载包括 Kafka IP地址等相关配置。为此我编写了 JobIntegrationTest 类,并规定所有集成测试用例必须继承该类。此外,KafkaSuite 是负责 Kafka 集成测试的套件,封装了各种流畅接口 API。
JobIntegrationTest 所有的集成测试必需继承 JobIntegrationTest 类,该类提供 final 的 setUp() 方法和 tearDown() 方法。
集成测试运行的流程是 SetUp -> All Tests -> TearDown:
setUp() 方法中使用了单独的线程启动 Job,所以不会阻塞主线程(测试方法)的运行 。
测试方法主体由集成测试使用者编写,如果用到 KafkaSuite 的 send 和 await 方法则会被阻塞,直到成功发送或者接收到消息 ,或者超时抛出异常 退出。
tearDown() 方法关闭 Job 和用到的资源。
 
KafkaSuite 目前集成测试实现了 Kafka 消息的阻塞发送 和阻塞监听 ,抽象出来的 KafkaSuite 提供了流畅接口以支持集成测试。kafkaSuite 作为 JobIntegrationTest 的 protected 字段可以直接在集成测试类中使用。我将 KafkaSuite 的流畅接口分为三大阶段:配置阶段(PrepareStep)、发送阶段(SendStep)和接受阶段(AwaitStep)。具体的流畅接口使用可以查看 KafkaSuiteTest 或者直接阅读源码 JavaDoc。
PrepareStep 由于 JobIntegrationTest 会加载配置文件并由此创建 KafakSuite 实例,所以正常情况下不需要独立配置 IP、端口等。但为提供给特殊需求使用,仍设计相关接口:
1 2 3 kafkaSuite.clusterIps("172.20.10.120:9092,172.20.10.152:9092,172.20.10.171:9092" )             .noSsl()             .send()  
SendStep KafkaSuite 发送消息是阻塞的,设置的超时时间是 10s,如果 10s 未发送成功则会抛出 SendTimeoutException 异常 ,这时候需要检查配置文件中的 IP 和 SSL 配置是否正确。
API 使用如下:
1 2 3 kafkaSuite.send(new  ResourceFile("kafka/test-suite.json" ))             .toTopic()             .ofConfig("INTEGRATION-TEST" ); 
send() 方法提供发送字符串、文件或数据流的接口:
1 2 3 4 5 6 7 public  interface  DSLSendStep      DSLSendToStep send (String value)  ;     DSLSendToStep send (ResourceFile file)  throws  ResourceFileNotFoundException ;     DSLSendToStep send (InputStream inputStream)  throws  IOException ; } 
toTopic() 方法可以直接 toTopic("xxx") 指定明确 Topic,但更加推荐从配置文件读取的写法:
1 2 3 4 5 public  interface  DSLSendToStep      void  toTopic (String topic)      DSLSentLoadConfigStep toTopic ()  ; } 
AwaitStep KafkaSuite 监听消息同样是阻塞的。设置的默认超时时间是 60s ,也提供自定义超时时间的接口如 await(300),单位为秒,超过这个时间未接收到消息则抛出 AwaitTimeoutException 。理论上我们都是先发再收,所以如果发送成功则代表 Kafka 连接不存在问题(也就是配置没有问题),接收不到就应该是业务代码有问题,这时候就需要 Debug 排查问题。
监听结果提供监听一条 和监听多条 的 API,但不管是使用哪一个,await() 后会取到 Kafka 该 Topic 的上一个 offset 以来的所有新消息,然后变更 offset。可以根据 Index 获取多条消息里的某一条,Index 从 0 开始计算。大致用法是:使用 await().latestOne() 获取最新一条,await().multi() 或者直接 await() 获取多条。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 public  interface  DSLAwaitIndexStep  extends  DSLAwaitMultiFromStep                     DSLAwaitOneFromStep latestOne ()  ;     DSLAwaitOneFromStep one ()  ;     DSLAwaitOneFromStep lastOne ()  ;                    DSLAwaitOneFromStep one (int  index)  ;     DSLAwaitOneFromStep firstOne ()  ;     DSLAwaitOneFromStep secondOne ()  ;     DSLAwaitOneFromStep thirdOne ()  ;                    DSLAwaitMultiFromStep all ()  ;     DSLAwaitMultiFromStep multi ()  ;                    @Override      DSLAwaitMultiLoadConfigStep fromTopic ()  ;     @Override      DSLAwaitMultiRecordsStep fromTopic (String topic)  ;     @Override      DSLAwaitMultiRecordsStep fromTopic (String topic, String groupId)  ; } 
下面是监听一条并且验证消息是否正确 的写法,这里使用了 toJavaObject() 转成了 Java 对象,这个是我提供的转换消息的 API,后面会有介绍。
1 2 3 4 5 6 7 8 9 10 11 12 kafkaSuite.send("{\"name\":\"jack\",\"age\":25}" )                 .toTopic()                 .ofConfig("INTEGRATION-TEST" );                  Person record = kafkaSuite                 .await().latestOne()                 .fromTopic()                 .ofConfig("INTEGRATION-TEST" )                 .toJavaObject(Person.class);                  assertThat(record.getName()).isEqualTo("jack" ); assertThat(record.getAge()).isEqualTo(25 ); 
由于有的业务场景会出口多条消息,所以接下来演示验证多条消息的正确写法 。提供了 fetchFirstOne() 和 fetchOne(int index) 根据 Index 获取多条中的一条等 API:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 kafkaSuite.send("{\"name\": \"jack\", \"age\": 25}" )         .toTopic()         .ofConfig("INTEGRATION-TEST" ); kafkaSuite.send("{\"name\": \"tony\", \"age\": 20}" )         .toTopic()         .ofConfig("INTEGRATION-TEST" ); DSLAwaitMultiRecordsStep records = kafkaSuite         .await().multi()         .fromTopic()         .ofConfig("INTEGRATION-TEST" ); Person jack = records.fetchFirstOne().toJavaObject(Person.class); Person tony = records.fetchOne(1 ).toJavaObject(Person.class); assertThat(jack.getName()).isEqualTo("jack" ); assertThat(jack.getAge()).isEqualTo(25 ); assertThat(tony.getName()).isEqualTo("tony" ); assertThat(tony.getAge()).isEqualTo(20 ); 
最后结果的输出提供了转换和输出到文件的 API 。需要注意,输出到文件只是方便调试 Bug 时使用,不应该出现在正式的代码中;输出的文件被写入到 target/test-classes/resources 下。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 public  interface  DSLAwaitOneRecordStep  extends  DSLAwaitRecordStep , DSLAwaitOneRecordOutputStep , DSLAwaitOneRecordConvertStep      @Override      String toString ()  ;     @Override      JsonObject toJsonObject ()  ;     @Override      JSONArray toJsonArray ()  ;     @Override      void  toXml ()      @Override      <T> T toJavaObject (Class<T> clazz)  ;     @Override      void  writeAsTxt (String absolutePath)      @Override      void  writeAsTxt (ResourceFile resourceFile)  } 
代码结构 流畅接口 API 的设计一个重中之重就是接口和抽象类的使用 ,为了契合流式接口,有时不能按照面向对象的模式设计接口继承关系。简言之,在编写流畅接口 API 框架时,大致思路是先编写接口(Interface),按照流式的原则设计接口之间的继承关系,然后再编写具体的实现类。以下是我的代码结构,与 impl 文件夹并列的都是接口。具体代码详见我的 GitHub  项目。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 . ├── Assertions.java ├── JobIntegrationTest.java ├── KafkaSuite.java └── core     ├── dsl     │   ├── await     │   │   ├── DSLAwaitFromStep.java     │   │   ├── DSLAwaitIndexStep.java     │   │   ├── DSLAwaitLoadConfigStep.java     │   │   ├── DSLAwaitMultiFromStep.java     │   │   ├── DSLAwaitMultiLoadConfigStep.java     │   │   ├── DSLAwaitMultiRecordsConvertStep.java     │   │   ├── DSLAwaitMultiRecordsFetchOneStep.java     │   │   ├── DSLAwaitMultiRecordsStep.java     │   │   ├── DSLAwaitOneFromStep.java     │   │   ├── DSLAwaitOneLoadConfigStep.java     │   │   ├── DSLAwaitOneRecordConvertStep.java     │   │   ├── DSLAwaitOneRecordOutputStep.java     │   │   ├── DSLAwaitOneRecordStep.java     │   │   ├── DSLAwaitRecordStep.java     │   │   ├── DSLAwaitStep.java     │   │   └── impl     │   │       ├── AbstractDSLAwaitFromStep.java     │   │       ├── AbstractDSLAwaitLoadConfigStep.java     │   │       ├── DSLAwaitIndexStepImpl.java     │   │       ├── DSLAwaitMultiFromStepImpl.java     │   │       ├── DSLAwaitMultiLoadConfigStepImpl.java     │   │       ├── DSLAwaitMultiRecordsStepImpl.java     │   │       ├── DSLAwaitOneFromStepImpl.java     │   │       ├── DSLAwaitOneLoadConfigStepImpl.java     │   │       └── DSLAwaitOneRecordStepImpl.java     │   ├── send     │   │   ├── DSLSendStep.java     │   │   ├── DSLSendToStep.java     │   │   ├── DSLSentLoadConfigStep.java     │   │   └── impl     │   │       ├── DSLSendToStepImpl.java     │   │       └── DSLSentLoadConfigStepImpl.java     │   └── start     │       ├── DSLPrepareStep.java     │       └── DSLStartStep.java     └── exceptions         ├── AwaitTimeoutException.java         ├── InvalidIntegrationTestNameException.java         ├── SendNullValueException.java         └── SendTimeoutException.java