试手 RxJava 2.x 及对线程的初步理解

在进行数据流处理过程中,需要一个高效苗条的流处理组件,比如对输入流能进行分组(窗口),能进行流量控制(Back Pressure - 背压),这也就涉及到响应式编程,流处理框架。这方面如果直接基于 Akka actor 来构建 Akka ActorSystem 也是比较复杂,依赖的组件也不少。还有构筑在 Akka actor 之上的 Akka Streams,再往上的 Flink Streaming,它们都有像滑动,滚动窗口的概念,但是依赖更不得了。一个基本的 Flink Streaming 的项目会依赖到 45 M 以上的第三方组件,如果用它来写一个数据流处理的共享组件,那真是要命。Spring 5 也开始带上了自己的 Reactive-Streams 实现 Spring Reactor, 想要把它从 Spring 中单独抽离出也非易事。

Flink Streaming 组件依赖:org.apache.fling:flink-streaming-java_2.12:1.80, 会依赖于其他诸如 akka-stream, akka-actor, flink-core, flink-clients, scala-library 等非常多的东西

而另一个著名的响应式框架 RxJava 2 就清爽多了,完全没有第三方依赖,要说有也就是定义了四个接口的 reactive-streams(2 KB 大小),就自身那个  rxjava-2.2.9.jar 包只有 2.3 M,这才叫轻量级。因为它设计来是能被应用于 Android 客户端应用的,Andriod 上的 rxandriod-1.2.1.aar 只有 9 K。所以 RxJava 2.x 太适合用来写一些小的共享组件了。 阅读全文 >>

Java 与'嵌入式' PostgreSQL 数据库的单元测试

在我们对数据库 DAO 类进行单元测试时,通常不应该依赖于一个外部数据库,所以会选用特定比较接近于真实数据库类型的内存或嵌入式数据库,如 HSQLDB(HyperSQL), H2, Derby 等。但总难免会用到特定数据库的特性,这时候就无法用前述各种数据库进行测试了。非要单元测试中覆盖到所用的数据库特性的话可以选择用 docker,如 Testcontainers, 经过模块扩展,它可以由 docker 来启动许多种类型的数据库,MySQL, Postgres, Oracle-XE, MS SQL Server, Couchbase 等等,详情见 Database containers。刚了解到的是它的模块化的无限可能,像支持 Kafka Containers 和 Localstack Module 等。

这里就不走 Testcontainers 那条路 -- 要求构建服务器上也要有 docker。早先希望能找到一种嵌入式或内存 PostgreSQL 数据库,后来发现 PostgreSQL 未能提供 In-Process 和 In-Memory 的启动方式,好在 PostgreSQL 是开源,有人可以把它改造为小型的可由测试代码启停的本地数据库。有两个具有代表性的组件,分别是 OpenTable Embedded PostgreSQL ComponentEmbedded PostgreSQL Server,它们都号称是 Embedded,所谓嵌入式,其实是进测试进程外的数据库。

下面简单体验下两个组件的用法 阅读全文 >>

使用 Google Guava Striped 实现基于 Key 的并发锁

写 Java 代码至今,在应对可能冲突的共享资源操作时会尽量用 JDK 1.5 开始引入的并发锁(如 Lock 的各类实现类, ReentrantLock 等) 进行锁定,而不是原来的 synchronized 关键字强硬低性能锁。

这里是应用 JDK 1.5  的 Lock 的基本操作步骤

private Lock lock = new ReentrantLock();
private void operate() {
    // 安全操作 ....
    lock.lock();
    try {
        // 对共享资源的操作 ...
    } finally {
        lock.unlock();
    }
}

如此,operate() 就是一个线程安全的方法,任何对它的调用都安排到了一个队列里等着。但有时候上锁需要考虑更细的粒度,下面是一个演示案例,引出第一个问题

阅读全文 >>