vm/tests/gc_concurrent_tests.rs

636 lines
21 KiB
Rust
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

//! GC并发安全测试
//!
//! 使用loom测试GC并发标记、写屏障和标记栈的并发安全性
#[cfg(loom)]
mod loom_tests {
use loom::sync::Arc;
use loom::thread;
use vm_engine_jit::unified_gc::{LockFreeMarkStack, ShardedWriteBarrier, UnifiedGcConfig, UnifiedGC};
/// 测试无锁标记栈的并发安全性
#[test]
fn test_lockfree_mark_stack_concurrent() {
loom::model(|| {
let stack = Arc::new(LockFreeMarkStack::new(1000));
let mut handles = Vec::new();
// 创建多个线程并发推送和弹出
for i in 0..4 {
let stack_clone = Arc::clone(&stack);
let handle = thread::spawn(move || {
// 推送一些地址
for j in 0..100 {
let addr = (i * 100 + j) as u64;
let _ = stack_clone.push(addr);
}
// 弹出一些地址
for _ in 0..50 {
let _ = stack_clone.pop();
}
});
handles.push(handle);
}
// 等待所有线程完成
for handle in handles {
handle.join().unwrap();
}
// 验证最终状态的一致性
let final_size = stack.size.load(std::sync::atomic::Ordering::Relaxed);
assert!(final_size <= 400); // 最多400个元素4个线程各推送100个
});
}
/// 测试分片写屏障的并发安全性
#[test]
fn test_sharded_write_barrier_concurrent() {
loom::model(|| {
let barrier = Arc::new(ShardedWriteBarrier::new(4)); // 4个分片
let mut handles = Vec::new();
// 创建多个线程并发记录写屏障
for i in 0..8 {
let barrier_clone = Arc::clone(&barrier);
let handle = thread::spawn(move || {
// 记录写屏障
for j in 0..100 {
let addr = (i * 100 + j) as u64;
barrier_clone.record_write(addr, addr + 0x1000);
}
});
handles.push(handle);
}
// 等待所有线程完成
for handle in handles {
handle.join().unwrap();
}
});
}
/// 测试GC并发标记的安全性
#[test]
fn test_gc_concurrent_marking() {
loom::model(|| {
let config = UnifiedGcConfig {
concurrent_marking: true,
mark_stack_capacity: 1000,
write_barrier_shards: 4,
..Default::default()
};
let gc = Arc::new(UnifiedGC::new(config));
let mark_stack = Arc::new(LockFreeMarkStack::new(1000));
let write_barrier = Arc::new(ShardedWriteBarrier::new(4));
let mut handles = Vec::new();
// 创建标记线程
for _ in 0..2 {
let stack_clone = Arc::clone(&mark_stack);
let handle = thread::spawn(move || {
// 模拟标记操作
for i in 0..100 {
let addr = i as u64 * 0x1000;
let _ = stack_clone.push(addr);
}
});
handles.push(handle);
}
// 创建写屏障线程
for _ in 0..4 {
let barrier_clone = Arc::clone(&write_barrier);
let handle = thread::spawn(move || {
// 模拟写操作
for i in 0..50 {
let addr = i as u64 * 0x1000;
barrier_clone.record_write(addr, addr + 0x1000);
}
});
handles.push(handle);
}
// 等待所有线程完成
for handle in handles {
handle.join().unwrap();
}
});
}
}
/// 非loom环境的并发测试使用标准库
#[cfg(not(loom))]
mod std_tests {
use std::sync::Arc;
use std::thread;
use vm_engine_jit::unified_gc::{LockFreeMarkStack, ShardedWriteBarrier};
/// 测试无锁标记栈的并发安全性(标准库版本)
#[test]
fn test_lockfree_mark_stack_concurrent_std() {
let stack = Arc::new(LockFreeMarkStack::new(1000));
let mut handles = Vec::new();
// 创建多个线程并发推送和弹出
for i in 0..4 {
let stack_clone = Arc::clone(&stack);
let handle = thread::spawn(move || {
// 推送一些地址
for j in 0..100 {
let addr = (i * 100 + j) as u64;
let _ = stack_clone.push(addr);
}
// 弹出一些地址
for _ in 0..50 {
let _ = stack_clone.pop();
}
});
handles.push(handle);
}
// 等待所有线程完成
for handle in handles {
handle.join().unwrap();
}
// 验证最终状态的一致性
let final_size = stack.size.load(std::sync::atomic::Ordering::Relaxed);
assert!(final_size <= 400); // 最多400个元素
}
/// 测试分片写屏障的并发安全性(标准库版本)
#[test]
fn test_sharded_write_barrier_concurrent_std() {
let barrier = Arc::new(ShardedWriteBarrier::new(4)); // 4个分片
let mut handles = Vec::new();
// 创建多个线程并发记录写屏障
for i in 0..8 {
let barrier_clone = Arc::clone(&barrier);
let handle = thread::spawn(move || {
// 记录写屏障
for j in 0..100 {
let addr = (i * 100 + j) as u64;
barrier_clone.record_write(addr, addr + 0x1000);
}
});
handles.push(handle);
}
// 等待所有线程完成
for handle in handles {
handle.join().unwrap();
}
}
/// 测试并发标记的正确性
///
/// 确保多个线程同时标记时不会丢失对象,所有对象都被正确标记
#[test]
fn test_concurrent_marking_correctness() {
use std::collections::HashSet;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::RwLock;
use vm_engine_jit::gc_marker::GcMarker;
use vm_engine_jit::unified_gc::{GCPhase, UnifiedGcStats};
let mark_stack = Arc::new(LockFreeMarkStack::new(10000));
let marked_set = Arc::new(RwLock::new(HashSet::new()));
let phase = Arc::new(AtomicU64::new(GCPhase::Marking as u64));
let stats = Arc::new(UnifiedGcStats::default());
// 准备测试对象1000个对象
let total_objects = 1000;
let objects: Vec<u64> = (0..total_objects).map(|i| i as u64 * 0x1000).collect();
// 将所有对象添加到标记栈
for &obj in &objects {
let _ = mark_stack.push(obj);
}
// 创建多个标记线程
let num_threads = 4;
let mut handles = Vec::new();
for _ in 0..num_threads {
let stack_clone = Arc::clone(&mark_stack);
let marked_clone = Arc::clone(&marked_set);
let phase_clone = Arc::clone(&phase);
let stats_clone = Arc::clone(&stats);
let handle = thread::spawn(move || {
let marker = GcMarker::new(
stack_clone,
marked_clone,
phase_clone,
stats_clone,
);
// 每个线程执行增量标记,直到完成
loop {
let (completed, _) = marker.incremental_mark(1000); // 1ms配额
if completed {
break;
}
// 短暂休眠,模拟实际场景
thread::sleep(std::time::Duration::from_micros(100));
}
});
handles.push(handle);
}
// 等待所有标记线程完成
for handle in handles {
handle.join().unwrap();
}
// 验证所有对象都被标记
let marked = marked_set.read().unwrap();
assert_eq!(
marked.len(),
total_objects,
"所有对象都应该被标记,实际标记了 {} 个,期望 {} 个",
marked.len(),
total_objects
);
// 验证每个对象都被标记
for &obj in &objects {
assert!(
marked.contains(&obj),
"对象 {:#x} 应该被标记",
obj
);
}
}
/// 测试并发清扫的正确性
///
/// 确保并行清扫时不会重复释放或遗漏对象
#[test]
fn test_concurrent_sweeping_correctness() {
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
use vm_engine_jit::gc_sweeper::GcSweeper;
use vm_engine_jit::unified_gc::{GCPhase, UnifiedGcStats};
let sweep_list = Arc::new(Mutex::new(Vec::new()));
let phase = Arc::new(AtomicU64::new(GCPhase::Sweeping as u64));
let stats = Arc::new(UnifiedGcStats::default());
// 准备待清扫对象1000个对象
let total_objects = 1000;
{
let mut list = sweep_list.lock().unwrap();
for i in 0..total_objects {
list.push(i as u64 * 0x1000);
}
}
// 创建清扫器
let sweeper = GcSweeper::new(
Arc::clone(&sweep_list),
Arc::clone(&phase),
Arc::clone(&stats),
100, // 批次大小
);
// 使用并行清扫
let mut total_freed = 0;
loop {
let (completed, freed_count) = sweeper.incremental_sweep(1000); // 1ms配额
total_freed += freed_count;
if completed {
break;
}
// 短暂休眠
thread::sleep(std::time::Duration::from_micros(100));
}
// 验证所有对象都被释放
assert_eq!(
total_freed,
total_objects,
"所有对象都应该被释放,实际释放了 {} 个,期望 {} 个",
total_freed,
total_objects
);
// 验证清扫列表为空
let list = sweep_list.lock().unwrap();
assert_eq!(
list.len(),
0,
"清扫列表应该为空,实际还有 {} 个对象",
list.len()
);
}
/// 测试标记和清扫的并发安全性
///
/// 确保在标记阶段和清扫阶段之间的正确性,不会出现竞态条件
#[test]
fn test_marking_and_sweeping_concurrent_safety() {
use std::collections::HashSet;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Mutex, RwLock};
use vm_engine_jit::gc_marker::GcMarker;
use vm_engine_jit::gc_sweeper::GcSweeper;
use vm_engine_jit::unified_gc::{GCPhase, UnifiedGcStats};
let mark_stack = Arc::new(LockFreeMarkStack::new(10000));
let marked_set = Arc::new(RwLock::new(HashSet::new()));
let sweep_list = Arc::new(Mutex::new(Vec::new()));
let phase = Arc::new(AtomicU64::new(GCPhase::Marking as u64));
let stats = Arc::new(UnifiedGcStats::default());
let marking_done = Arc::new(AtomicBool::new(false));
let total_objects = 500;
let objects: Vec<u64> = (0..total_objects).map(|i| i as u64 * 0x1000).collect();
// 添加对象到标记栈
for &obj in &objects {
let _ = mark_stack.push(obj);
}
// 标记线程
let mark_stack_clone = Arc::clone(&mark_stack);
let marked_set_clone = Arc::clone(&marked_set);
let phase_clone = Arc::clone(&phase);
let stats_clone = Arc::clone(&stats);
let marking_done_clone = Arc::clone(&marking_done);
let mark_handle = thread::spawn(move || {
let marker = GcMarker::new(
mark_stack_clone,
marked_set_clone,
phase_clone,
stats_clone,
);
loop {
let (completed, _) = marker.incremental_mark(1000);
if completed {
marking_done_clone.store(true, Ordering::Release);
break;
}
thread::sleep(std::time::Duration::from_micros(100));
}
});
// 等待标记完成
mark_handle.join().unwrap();
// 验证标记完成
assert!(
marking_done.load(Ordering::Acquire),
"标记应该完成"
);
// 准备清扫列表(只包含未标记的对象,这里简化处理,假设所有对象都被标记)
// 实际场景中,清扫列表应该包含所有对象,然后根据标记状态决定是否释放
{
let mut list = sweep_list.lock().unwrap();
// 添加一些未标记的对象(模拟)
for i in total_objects..(total_objects + 100) {
list.push(i as u64 * 0x1000);
}
}
// 切换到清扫阶段
phase.store(GCPhase::Sweeping as u64, Ordering::Release);
// 清扫线程
let sweeper = GcSweeper::new(
Arc::clone(&sweep_list),
Arc::clone(&phase),
Arc::clone(&stats),
50,
);
let mut total_freed = 0;
loop {
let (completed, freed_count) = sweeper.incremental_sweep(1000);
total_freed += freed_count;
if completed {
break;
}
thread::sleep(std::time::Duration::from_micros(100));
}
// 验证清扫完成
assert_eq!(
total_freed,
100,
"应该释放100个未标记对象"
);
}
/// 测试写屏障在并发标记时的正确性
///
/// 确保在标记过程中,写屏障能够正确记录修改的对象
#[test]
fn test_write_barrier_during_concurrent_marking() {
use std::collections::HashSet;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::RwLock;
use vm_engine_jit::gc_marker::GcMarker;
use vm_engine_jit::unified_gc::{GCPhase, UnifiedGcStats};
let mark_stack = Arc::new(LockFreeMarkStack::new(10000));
let marked_set = Arc::new(RwLock::new(HashSet::new()));
let write_barrier = Arc::new(ShardedWriteBarrier::new(4));
let phase = Arc::new(AtomicU64::new(GCPhase::Marking as u64));
let stats = Arc::new(UnifiedGcStats::default());
// 启用写屏障
write_barrier.set_enabled(true);
// 准备初始对象
let initial_objects: Vec<u64> = (0..100).map(|i| i as u64 * 0x1000).collect();
for &obj in &initial_objects {
let _ = mark_stack.push(obj);
}
// 标记线程
let mark_stack_clone = Arc::clone(&mark_stack);
let marked_set_clone = Arc::clone(&marked_set);
let phase_clone = Arc::clone(&phase);
let stats_clone = Arc::clone(&stats);
let mark_handle = thread::spawn(move || {
let marker = GcMarker::new(
mark_stack_clone,
marked_set_clone,
phase_clone,
stats_clone,
);
// 执行增量标记
for _ in 0..10 {
let _ = marker.incremental_mark(500);
thread::sleep(std::time::Duration::from_micros(50));
}
});
// 写操作线程(模拟应用程序在标记过程中修改对象)
let write_barrier_clone = Arc::clone(&write_barrier);
let write_handle = thread::spawn(move || {
// 在标记过程中进行写操作
for i in 0..50 {
let addr = (i as u64 * 0x1000) + 0x100;
let new_addr = addr + 0x2000;
write_barrier_clone.record_write(addr, new_addr);
thread::sleep(std::time::Duration::from_micros(10));
}
});
// 等待所有线程完成
mark_handle.join().unwrap();
write_handle.join().unwrap();
// 验证写屏障记录了修改
let modified = write_barrier.drain_modified();
assert!(
modified.len() > 0,
"写屏障应该记录了一些修改的对象"
);
}
/// 测试标记栈在极端并发情况下的正确性
///
/// 测试大量线程同时推送和弹出时的正确性
#[test]
fn test_mark_stack_extreme_concurrency() {
let stack = Arc::new(LockFreeMarkStack::new(100000));
let num_threads = 16;
let operations_per_thread = 1000;
let mut handles = Vec::new();
// 创建大量线程进行并发操作
for i in 0..num_threads {
let stack_clone = Arc::clone(&stack);
let handle = thread::spawn(move || {
// 每个线程执行推送和弹出操作
for j in 0..operations_per_thread {
let addr = (i * operations_per_thread + j) as u64;
let _ = stack_clone.push(addr);
}
// 弹出一些元素
for _ in 0..(operations_per_thread / 2) {
let _ = stack_clone.pop();
}
});
handles.push(handle);
}
// 等待所有线程完成
for handle in handles {
handle.join().unwrap();
}
// 验证最终状态的一致性
let final_size = stack.size.load(std::sync::atomic::Ordering::Relaxed);
let expected_min = num_threads * operations_per_thread / 2; // 每个线程至少留下一半
assert!(
final_size >= expected_min,
"最终大小应该至少为 {},实际为 {}",
expected_min,
final_size
);
}
/// 测试并行清扫的性能和正确性
///
/// 对比串行和并行清扫的性能差异,并验证正确性
#[test]
fn test_parallel_sweeping_performance_and_correctness() {
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Mutex;
use vm_engine_jit::gc_sweeper::GcSweeper;
use vm_engine_jit::unified_gc::{GCPhase, UnifiedGcStats};
let total_objects = 10000;
// 测试并行清扫
let sweep_list_parallel = Arc::new(Mutex::new(Vec::new()));
{
let mut list = sweep_list_parallel.lock().unwrap();
for i in 0..total_objects {
list.push(i as u64 * 0x1000);
}
}
let phase_parallel = Arc::new(AtomicU64::new(GCPhase::Sweeping as u64));
let stats_parallel = Arc::new(UnifiedGcStats::default());
let sweeper_parallel = GcSweeper::new(
Arc::clone(&sweep_list_parallel),
Arc::clone(&phase_parallel),
Arc::clone(&stats_parallel),
1000,
);
let start_parallel = std::time::Instant::now();
let mut total_freed_parallel = 0;
loop {
let (completed, freed_count) = sweeper_parallel.incremental_sweep_with_parallel(10000, true);
total_freed_parallel += freed_count;
if completed {
break;
}
}
let duration_parallel = start_parallel.elapsed();
// 测试串行清扫
let sweep_list_serial = Arc::new(Mutex::new(Vec::new()));
{
let mut list = sweep_list_serial.lock().unwrap();
for i in 0..total_objects {
list.push(i as u64 * 0x1000);
}
}
let phase_serial = Arc::new(AtomicU64::new(GCPhase::Sweeping as u64));
let stats_serial = Arc::new(UnifiedGcStats::default());
let sweeper_serial = GcSweeper::new(
Arc::clone(&sweep_list_serial),
Arc::clone(&phase_serial),
Arc::clone(&stats_serial),
1000,
);
let start_serial = std::time::Instant::now();
let mut total_freed_serial = 0;
loop {
let (completed, freed_count) = sweeper_serial.incremental_sweep_with_parallel(10000, false);
total_freed_serial += freed_count;
if completed {
break;
}
}
let duration_serial = start_serial.elapsed();
// 验证正确性:两种方式都应该释放所有对象
assert_eq!(
total_freed_parallel,
total_objects,
"并行清扫应该释放所有对象"
);
assert_eq!(
total_freed_serial,
total_objects,
"串行清扫应该释放所有对象"
);
// 验证性能:并行应该更快(在多核系统上)
println!(
"并行清扫耗时: {:?}, 串行清扫耗时: {:?}",
duration_parallel, duration_serial
);
}
}