核心实现原理
- RpcResponseFutureManager: 核心管理器,使用 ConcurrentHashMap 存储 CompletableFuture,实现请求ID与Future的映射
- 异步转同步流程:客户端发起请求时创建 Future
- 异步发送 RPC 请求
- 调用 future.get() 阻塞等待响应
- 服务端处理完成后通过回调完成 Future
- 客户端线程被唤醒,返回结果
- 关键特性:
- ✅ 超时控制(30秒)
- ✅ 异常处理
- ✅ 自动清理过期请求
- ✅ 支持纯异步和同步两种调用方式
核心代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Slf4j
@Service
@RequiredArgsConstructor
public class RpcClient {
private final RpcResponseFutureManager futureManager;
private final RestTemplate restTemplate;
/**
* 同步调用RPC(异步转同步)
*/
public RpcResponse syncCall(String method, Object... params) throws Exception {
String requestId = UUID.randomUUID().toString();
// 创建请求
RpcRequest request = new RpcRequest(
requestId,
method,
params,
System.currentTimeMillis()
);
// 创建Future
CompletableFuture<RpcResponse> future = futureManager.createFuture(requestId);
// 异步发送请求
CompletableFuture.runAsync(() -> {
try {
log.info("发送RPC请求: {}", requestId);
Object obj = restTemplate.postForObject(
"http://localhost:8080/rpc/invoke",
request,
Object.class
);
RpcResponse response = new RpcResponse();
response.setRequestId(requestId);
response.setResult(obj);
futureManager.completeFuture(requestId, response);
} catch (Exception e) {
log.error("发送RPC请求失败", e);
RpcResponse errorResponse = new RpcResponse();
errorResponse.setRequestId(requestId);
errorResponse.setError(e.getMessage());
futureManager.completeFuture(requestId, errorResponse);
}
});
// 同步等待结果
try {
RpcResponse response = future.get(30, TimeUnit.SECONDS);
log.info("收到RPC响应: {}", requestId);
return response;
} catch (TimeoutException e) {
log.error("RPC调用超时: {}", requestId);
throw new Exception("RPC call timeout");
} catch (InterruptedException | ExecutionException e) {
log.error("RPC调用异常: {}", requestId, e);
throw new Exception("RPC call failed: " + e.getMessage());
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@Component
public class RpcResponseFutureManager {
// 存储请求ID和对应的Future
private final ConcurrentHashMap<String, CompletableFuture<RpcResponse>> futureMap =
new ConcurrentHashMap<>();
/**
* 创建一个Future并存储
*/
public CompletableFuture<RpcResponse> createFuture(String requestId) {
CompletableFuture<RpcResponse> future = new CompletableFuture<>();
futureMap.put(requestId, future);
return future;
}
/**
* 完成一个Future(当响应到达时调用)
*/
public void completeFuture(String requestId, RpcResponse response) {
CompletableFuture<RpcResponse> future = futureMap.remove(requestId);
if (future != null) {
future.complete(response);
}
}
/**
* 获取Future
*/
public CompletableFuture<RpcResponse> getFuture(String requestId) {
return futureMap.get(requestId);
}
/**
* 移除Future
*/
public void removeFuture(String requestId) {
futureMap.remove(requestId);
}
}
