现代应用开发离不开数据持久化和系统集成,Vert.x提供了一套完整的异步数据访问解决方案,支持各种数据库和消息系统。本节就来聊聊Vert.x如何与关系型数据库、NoSQL数据库以及消息队列进行交互,并展示如何利用Vert.x的异步特性构建高效的数据访问层。
异步SQL数据库访问
Vert.x提供了多种方式与SQL数据库交互,包括异步JDBC、Reactive SQL客户端等。这些客户端都是非阻塞的,不会阻塞事件循环线程。下面以Vert.x的异步JDBC客户端为例:
首先添加JDBC客户端依赖:
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-jdbc-client</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
然后创建数据库连接池并执行查询:
import io.vertx.core.Future;
import io.vertx.core.VerticleBase;
import io.vertx.core.Vertx;
import io.vertx.jdbcclient.JDBCConnectOptions;
import io.vertx.jdbcclient.JDBCPool;
import io.vertx.sqlclient.*;
public class DatabaseVerticle extends VerticleBase{
@Override
public Future<?> start() {
JDBCConnectOptions connectOptions = new JDBCConnectOptions()
.setJdbcUrl("jdbc:mysql://localhost:3366/mysql")
.setUser("root")
.setPassword("root");
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(16);
Pool pool = JDBCPool.pool(vertx, connectOptions, poolOptions);
Query<RowSet<Row>> query = pool.query("SELECT * FROM user");
query.execute()
.onFailure(e -> {
System.err.println("查询失败:" +e.getCause());
})
.onSuccess(rows -> {
for (Row row : rows) {
System.out.println(row.toJson());
}
});
return Future.succeededFuture(null);
}
public static void main(String[] args) {
Vertx.vertx().deployVerticle(new DatabaseVerticle());
}
}
NoSQL数据库集成
Vert.x提供了多种NoSQL数据库的异步客户端,包括MongoDB、Redis等。以下是使用Vert.x的MongoDB客户端的示例:
添加MongoDB客户端依赖:
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-mongo-client</artifactId>
</dependency>
使用MongoDB客户端:
import io.vertx.ext.mongo.MongoClient;
JsonObject config = new JsonObject()
// .put("host", "localhost")
// .put("port", 27017)
.put("connection_string", "mongodb://localhost:27017")
.put("db_name", "mydb");
MongoClient mongoClient = MongoClient.createShared(vertx, config);
// 插入文档
JsonObject document = new JsonObject()
.put("title", "Vert.x in Action")
.put("author", "John Doe")
.put("year", 2025);
mongoClient.insert("books", document).onComplete(res -> {
if (res.succeeded()) {
String id = res.result();
System.out.println("Inserted book with id: " + id);
} else {
res.cause().printStackTrace();
}
});
// 查询文档
JsonObject query = new JsonObject().put("author", "John Doe");
mongoClient.find("books", query).onComplete(res -> {
if (res.succeeded()) {
for (JsonObject doc : res.result()) {
System.out.println("Found book: " + doc.getString("title"));
}
}else {
res.cause().printStackTrace();
}
});
Redis集成
Redis是流行的内存数据结构存储,Vert.x提供了全功能的异步Redis客户端:
添加Redis客户端依赖:
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-redis-client</artifactId>
</dependency>
使用Redis客户端:
import io.vertx.redis.client.Redis;
import io.vertx.redis.client.RedisAPI;
import io.vertx.redis.client.RedisOptions;
RedisOptions redisOptions = new RedisOptions();
// 不填默认就是 redis://localhost:6379
redisOptions.addConnectionString("redis://localhost:6379");
// redisOptions.setPassword("password");
Redis client = Redis.createClient(vertx, redisOptions);
RedisAPI redis = RedisAPI.api(client);
String key = "hello";
String value = "cxl";
return client.connect()
.compose(conn -> redis.set(Arrays.asList(key, value))
.compose(v -> {
System.out.println("key stored");
return redis.get(key);
}))
.onSuccess(result -> {
System.out.println("Retrieved value: " + result);
}).onComplete((response, throwable) -> {
if (throwable != null) {
throwable.printStackTrace();
}
});
消息队列集成
Vert.x与消息队列的集成非常强大,特别是通过事件总线(Event Bus)可以轻松实现分布式消息传递。Vert.x还提供了专门的Kafka、RabbitMQ等消息系统的客户端。
事件总线(Event Bus)
事件总线是Vert.x的神经系统,可以在不同Verticle之间传递消息,甚至可以在集群中的不同节点间传递消息。
// 接收消息
vertx.eventBus().consumer("demo.eventbus.example", message -> {
System.out.println("Received message: " + message.body());
});
// 发送消息
vertx.eventBus().publish("demo.eventbus.example", "Hello, event bus!");
Kafka集成
对于更复杂的消息系统,Vert.x提供了Kafka客户端:
添加Kafka客户端依赖:
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-kafka-client</artifactId>
</dependency>
使用Kafka生产者与消费者:
import io.vertx.kafka.client.producer.KafkaProducer;
import io.vertx.kafka.client.producer.KafkaProducerRecord;
// 生产者配置
Map<String, String> producerConfig = new HashMap<>();
producerConfig.put("bootstrap.servers", "localhost:9092");
producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = KafkaProducer.create(vertx, producerConfig);
// 发送消息
KafkaProducerRecord<String, String> record =
KafkaProducerRecord.create("my-topic", "message-key", "message-value");
producer.write(record, done -> {
if (done.succeeded()) {
System.out.println("Message sent");
}
});
// 消费者配置
Map<String, String> consumerConfig = new HashMap<>();
consumerConfig.put("bootstrap.servers", "localhost:9092");
consumerConfig.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerConfig.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
consumerConfig.put("group.id", "my-group");
consumerConfig.put("auto.offset.reset", "earliest");
consumerConfig.put("enable.auto.commit", "false");
// 创建消费者
KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, consumerConfig);
// 订阅主题并处理消息
consumer.handler(record -> {
System.out.println("Received: " + record.value());
});
consumer.subscribe("my-topic", ar -> {
if (ar.succeeded()) {
System.out.println("Subscribed");
}
});
响应式编程与RxJava集成
为了更优雅地处理异步操作链,Vert.x支持与RxJava集成,提供了更函数式的编程风格:
import io.vertx.rxjava.core.AbstractVerticle;
import io.vertx.rxjava.ext.web.client.WebClient;
import io.vertx.rxjava.ext.web.codec.BodyCodec;
public class RxVerticle extends AbstractVerticle {
@Override
public void start() {
WebClient client = WebClient.create(vertx);
client.get(8080, "localhost", "/")
.as(BodyCodec.string())
.rxSend()
.map(response -> {
if (response.statusCode() != 200) {
throw new RuntimeException("HTTP error");
}
return response.body();
})
.subscribe(
body -> System.out.println("Response: " + body),
err -> System.err.println("Error: " + err.getMessage())
);
}
}
通过以上示例,我们可以看到Vert.x提供了全面的数据访问和系统集成能力,所有这些都建立在异步非阻塞的基础上。这使得Vert.x应用在处理大量并发数据操作时仍能保持高性能和低资源消耗。在往后的话就是Vert.x在微服务架构中的应用,包括服务发现、负载均衡和分布式配置等高级主题了,有兴趣或机会再上手吧。