cxl
Published on 2025-07-17 / 9 Visits
0
0

Vert.x数据访问与集成

现代应用开发离不开数据持久化和系统集成,Vert.x提供了一套完整的异步数据访问解决方案,支持各种数据库和消息系统。本节就来聊聊Vert.x如何与关系型数据库、NoSQL数据库以及消息队列进行交互,并展示如何利用Vert.x的异步特性构建高效的数据访问层。

异步SQL数据库访问

Vert.x提供了多种方式与SQL数据库交互,包括异步JDBC、Reactive SQL客户端等。这些客户端都是非阻塞的,不会阻塞事件循环线程。下面以Vert.x的异步JDBC客户端为例:

首先添加JDBC客户端依赖:

  <dependency>
    <groupId>io.vertx</groupId>
    <artifactId>vertx-jdbc-client</artifactId>
  </dependency>
  
  <dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.33</version>
  </dependency>

然后创建数据库连接池并执行查询:

  import io.vertx.core.Future;
  import io.vertx.core.VerticleBase;
  import io.vertx.core.Vertx;
  import io.vertx.jdbcclient.JDBCConnectOptions;
  import io.vertx.jdbcclient.JDBCPool;
  import io.vertx.sqlclient.*;

  public class DatabaseVerticle extends VerticleBase{
        
    @Override
    public Future<?> start() {
  
      JDBCConnectOptions connectOptions = new JDBCConnectOptions()
        .setJdbcUrl("jdbc:mysql://localhost:3366/mysql")
        .setUser("root")
        .setPassword("root");
      PoolOptions poolOptions = new PoolOptions()
        .setMaxSize(16);
      Pool pool = JDBCPool.pool(vertx, connectOptions, poolOptions);
  
      Query<RowSet<Row>> query = pool.query("SELECT * FROM user");
      query.execute()
        .onFailure(e -> {
          System.err.println("查询失败:" +e.getCause());
        })
        .onSuccess(rows -> {
          for (Row row : rows) {
            System.out.println(row.toJson());
          }
        });
  
      return Future.succeededFuture(null);
    }

    public static void main(String[] args) {
      Vertx.vertx().deployVerticle(new DatabaseVerticle());
    }
  }

NoSQL数据库集成

Vert.x提供了多种NoSQL数据库的异步客户端,包括MongoDB、Redis等。以下是使用Vert.x的MongoDB客户端的示例:

添加MongoDB客户端依赖:

  <dependency>
      <groupId>io.vertx</groupId>
      <artifactId>vertx-mongo-client</artifactId>
  </dependency>

使用MongoDB客户端:

  import io.vertx.ext.mongo.MongoClient;

  JsonObject config = new JsonObject()
  //      .put("host", "localhost")
  //      .put("port", 27017)
      .put("connection_string", "mongodb://localhost:27017")
      .put("db_name", "mydb");

    MongoClient mongoClient = MongoClient.createShared(vertx, config);

    // 插入文档
    JsonObject document = new JsonObject()
      .put("title", "Vert.x in Action")
      .put("author", "John Doe")
      .put("year", 2025);

    mongoClient.insert("books", document).onComplete(res -> {
      if (res.succeeded()) {
        String id = res.result();
        System.out.println("Inserted book with id: " + id);
      } else {
        res.cause().printStackTrace();
      }
    });

    // 查询文档
    JsonObject query = new JsonObject().put("author", "John Doe");
    mongoClient.find("books", query).onComplete(res -> {
      if (res.succeeded()) {
        for (JsonObject doc : res.result()) {
          System.out.println("Found book: " + doc.getString("title"));
        }
      }else {
        res.cause().printStackTrace();
      }
    });

Redis集成

Redis是流行的内存数据结构存储,Vert.x提供了全功能的异步Redis客户端

添加Redis客户端依赖:

  <dependency>
      <groupId>io.vertx</groupId>
      <artifactId>vertx-redis-client</artifactId>
  </dependency>

使用Redis客户端:

  import io.vertx.redis.client.Redis;
  import io.vertx.redis.client.RedisAPI;
  import io.vertx.redis.client.RedisOptions;

  RedisOptions redisOptions = new RedisOptions();
  // 不填默认就是 redis://localhost:6379
  redisOptions.addConnectionString("redis://localhost:6379");
  // redisOptions.setPassword("password");

  Redis client = Redis.createClient(vertx, redisOptions);
  RedisAPI redis = RedisAPI.api(client);

  String key = "hello";
  String value = "cxl";

  return client.connect()
    .compose(conn -> redis.set(Arrays.asList(key, value))
      .compose(v -> {
        System.out.println("key stored");
        return redis.get(key);
      }))
    .onSuccess(result -> {
      System.out.println("Retrieved value: " + result);
    }).onComplete((response, throwable) -> {
      if (throwable != null) {
        throwable.printStackTrace();
      }
    });

消息队列集成

Vert.x与消息队列的集成非常强大,特别是通过事件总线(Event Bus)​可以轻松实现分布式消息传递。Vert.x还提供了专门的Kafka、RabbitMQ等消息系统的客户端。

事件总线(Event Bus)

事件总线是Vert.x的神经系统,可以在不同Verticle之间传递消息,甚至可以在集群中的不同节点间传递消息。

  // 接收消息
  vertx.eventBus().consumer("demo.eventbus.example", message -> {
      System.out.println("Received message: " + message.body());
    });

  // 发送消息
  vertx.eventBus().publish("demo.eventbus.example", "Hello, event bus!");

Kafka集成

对于更复杂的消息系统,Vert.x提供了Kafka客户端:

添加Kafka客户端依赖:

  <dependency>
      <groupId>io.vertx</groupId>
      <artifactId>vertx-kafka-client</artifactId>
  </dependency>

使用Kafka生产者与消费者:

  import io.vertx.kafka.client.producer.KafkaProducer;
  import io.vertx.kafka.client.producer.KafkaProducerRecord;
  
  // 生产者配置
  Map<String, String> producerConfig = new HashMap<>();
  producerConfig.put("bootstrap.servers", "localhost:9092");
  producerConfig.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  producerConfig.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  
  KafkaProducer<String, String> producer = KafkaProducer.create(vertx, producerConfig);
  
  // 发送消息
  KafkaProducerRecord<String, String> record = 
      KafkaProducerRecord.create("my-topic", "message-key", "message-value");
  
  producer.write(record, done -> {
      if (done.succeeded()) {
          System.out.println("Message sent");
      }
  });
  
  // 消费者配置
  Map<String, String> consumerConfig = new HashMap<>();
  consumerConfig.put("bootstrap.servers", "localhost:9092");
  consumerConfig.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  consumerConfig.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  consumerConfig.put("group.id", "my-group");
  consumerConfig.put("auto.offset.reset", "earliest");
  consumerConfig.put("enable.auto.commit", "false");
  
  // 创建消费者
  KafkaConsumer<String, String> consumer = KafkaConsumer.create(vertx, consumerConfig);
  
  // 订阅主题并处理消息
  consumer.handler(record -> {
      System.out.println("Received: " + record.value());
  });
  
  consumer.subscribe("my-topic", ar -> {
      if (ar.succeeded()) {
          System.out.println("Subscribed");
      }
  });

响应式编程与RxJava集成

为了更优雅地处理异步操作链,Vert.x支持与RxJava集成,提供了更函数式的编程风格:

import io.vertx.rxjava.core.AbstractVerticle;
import io.vertx.rxjava.ext.web.client.WebClient;
import io.vertx.rxjava.ext.web.codec.BodyCodec;

public class RxVerticle extends AbstractVerticle {
    
    @Override
    public void start() {
        WebClient client = WebClient.create(vertx);
        
        client.get(8080, "localhost", "/")
             .as(BodyCodec.string())
             .rxSend()
             .map(response -> {
                 if (response.statusCode() != 200) {
                     throw new RuntimeException("HTTP error");
                 }
                 return response.body();
             })
             .subscribe(
                 body -> System.out.println("Response: " + body),
                 err -> System.err.println("Error: " + err.getMessage())
             );
    }
}

通过以上示例,我们可以看到Vert.x提供了全面的数据访问和系统集成能力,所有这些都建立在异步非阻塞的基础上。这使得Vert.x应用在处理大量并发数据操作时仍能保持高性能和低资源消耗。在往后的话就是Vert.x在微服务架构中的应用,包括服务发现、负载均衡和分布式配置等高级主题了,有兴趣或机会再上手吧。


Comment