Home Java 中自己实现 RPC 中 异步转同步
Post
Cancel

Java 中自己实现 RPC 中 异步转同步

核心实现原理

  1. RpcResponseFutureManager: 核心管理器,使用 ConcurrentHashMap 存储 CompletableFuture,实现请求ID与Future的映射
  2. 异步转同步流程:客户端发起请求时创建 Future
    • 异步发送 RPC 请求
    • 调用 future.get() 阻塞等待响应
    • 服务端处理完成后通过回调完成 Future
    • 客户端线程被唤醒,返回结果
  3. 关键特性:
    • ✅ 超时控制(30秒)
    • ✅ 异常处理
    • ✅ 自动清理过期请求
    • ✅ 支持纯异步和同步两种调用方式

核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Slf4j
@Service
@RequiredArgsConstructor
public class RpcClient {

  private final RpcResponseFutureManager futureManager;
  private final RestTemplate restTemplate;

  /**
   * 同步调用RPC(异步转同步)
   */
  public RpcResponse syncCall(String method, Object... params) throws Exception {
    String requestId = UUID.randomUUID().toString();

    // 创建请求
    RpcRequest request = new RpcRequest(
      requestId,
      method,
      params,
      System.currentTimeMillis()
    );

    // 创建Future
    CompletableFuture<RpcResponse> future = futureManager.createFuture(requestId);

    // 异步发送请求
    CompletableFuture.runAsync(() -> {
      try {
        log.info("发送RPC请求: {}", requestId);
        Object obj = restTemplate.postForObject(
          "http://localhost:8080/rpc/invoke",
          request,
          Object.class
        );
        RpcResponse response = new RpcResponse();
        response.setRequestId(requestId);
        response.setResult(obj);
        futureManager.completeFuture(requestId, response);
      } catch (Exception e) {
        log.error("发送RPC请求失败", e);
        RpcResponse errorResponse = new RpcResponse();
        errorResponse.setRequestId(requestId);
        errorResponse.setError(e.getMessage());
        futureManager.completeFuture(requestId, errorResponse);
      }
    });

    // 同步等待结果
    try {
      RpcResponse response = future.get(30, TimeUnit.SECONDS);
      log.info("收到RPC响应: {}", requestId);
      return response;
    } catch (TimeoutException e) {
      log.error("RPC调用超时: {}", requestId);
      throw new Exception("RPC call timeout");
    } catch (InterruptedException | ExecutionException e) {
      log.error("RPC调用异常: {}", requestId, e);
      throw new Exception("RPC call failed: " + e.getMessage());
    }
  }
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Component
public class RpcResponseFutureManager {

    // 存储请求ID和对应的Future
    private final ConcurrentHashMap<String, CompletableFuture<RpcResponse>> futureMap =
            new ConcurrentHashMap<>();

    /**
     * 创建一个Future并存储
     */
    public CompletableFuture<RpcResponse> createFuture(String requestId) {
        CompletableFuture<RpcResponse> future = new CompletableFuture<>();
        futureMap.put(requestId, future);
        return future;
    }

    /**
     * 完成一个Future(当响应到达时调用)
     */
    public void completeFuture(String requestId, RpcResponse response) {
        CompletableFuture<RpcResponse> future = futureMap.remove(requestId);
        if (future != null) {
            future.complete(response);
        }
    }

    /**
     * 获取Future
     */
    public CompletableFuture<RpcResponse> getFuture(String requestId) {
        return futureMap.get(requestId);
    }

    /**
     * 移除Future
     */
    public void removeFuture(String requestId) {
        futureMap.remove(requestId);
    }
}

weixin.png

公众号名称:怪味Coding
微信扫码关注或搜索公众号名称
This post is licensed under CC BY 4.0 by the author.

Java性能分析第四节 - Linux 下 JVM监控.

Java 并发编程 -ThreadPoolExecutor