6.2: Code: Locks

注释:该内容基于原版课程的textbook和源代码,在大模型辅助翻译的基础上进行的整理。

Xv6 有两种类型的锁:自旋锁和睡眠锁。Xv6 将自旋锁表示为 struct spinlock(kernel/spinlock.h:2)。
该结构中重要的字段是 locked,当锁可用时该字段为零,当锁被持有时该字段为非零。
从逻辑上讲,Xv6 应该通过执行如下代码来获取锁:

21 void
22 acquire(struct spinlock *lk) // does not work!
23 {
24 for(;;) {
25 if(lk->locked == 0) {
26 lk->locked = 1;
27 break;
28 }
29 }
30 }

不幸的是,这种实现无法保证在多处理器上的互斥性。可能会出现两个 CPU 同时到达第 25 行,看到 lk->locked 为零,然后都通过执行第 26 行来获取锁。
这时,两个不同的 CPU 都持有锁,这违反了互斥性原则。我们需要一种方法将第 25 行和第 26 行作为一个原子(即不可分割)步骤来执行。

由于锁被广泛使用,多核处理器通常提供实现第 25 行和第 26 行的原子版本的指令。在 RISC-V 上,这条指令是 amoswap r, a。
amoswap 从内存地址 a 读取值,将寄存器 r 的内容写入该地址,并将读取到的值存入 r。
也就是说,它在寄存器和内存地址之间交换内容。它以原子的方式执行这一序列,使用特殊硬件防止在读写之间任何其他 CPU 访问该内存地址。

Xv6 的 acquire(kernel/spinlock.c:22)使用了可移植的 C 库调用 __sync_lock_test_and_set,其本质上等同于 amoswap 指令;
返回值是 lk->locked 的旧(交换前)内容。acquire 函数将交换操作包装在一个循环中,重复(自旋)直到获取锁。每次循环将 1 交换到 lk->locked 并检查先前的值;如果先前的值为 0,则说明已获取到锁,此时交换操作会将 lk->locked 设为 1。
如果先前的值为 1,则表示其他 CPU 持有该锁,而我们原子地将 1 交换到 lk->locked 的操作并未改变其值。

一旦获取到锁,acquire 会记录获取锁的 CPU 以便调试。lk->cpu 字段受锁保护,仅在持有锁时才能修改。

release(kernel/spinlock.c:47)是 acquire 的反操作:它清除 lk->cpu 字段并释放锁。从概念上讲,释放锁只需要将 lk->locked 赋值为 0。
C 标准允许编译器用多个存储指令实现赋值,因此 C 语言中的赋值相对于并发代码可能是非原子的。相反,release 使用了 C 库函数 __sync_lock_release,该函数执行原子赋值操作。这个函数也等同于 RISC-V 的 amoswap 指令。

一下代码摘自: git://g.csail.mit.edu/xv6-labs-2024

// Acquire the lock.
// Loops (spins) until the lock is acquired.
void
acquire(struct spinlock *lk)
{
  push_off(); // disable interrupts to avoid deadlock.
  if(holding(lk))
    panic("acquire");

  // On RISC-V, sync_lock_test_and_set turns into an atomic swap:
  //   a5 = 1
  //   s1 = &lk->locked
  //   amoswap.w.aq a5, a5, (s1)
  while(__sync_lock_test_and_set(&lk->locked, 1) != 0)
    ;

  // Tell the C compiler and the processor to not move loads or stores
  // past this point, to ensure that the critical section's memory
  // references happen strictly after the lock is acquired.
  // On RISC-V, this emits a fence instruction.
  __sync_synchronize();

  // Record info about lock acquisition for holding() and debugging.
  lk->cpu = mycpu();
}

// Release the lock.
void
release(struct spinlock *lk)
{
  if(!holding(lk))
    panic("release");

  lk->cpu = 0;

  // Tell the C compiler and the CPU to not move loads or stores
  // past this point, to ensure that all the stores in the critical
  // section are visible to other CPUs before the lock is released,
  // and that loads in the critical section occur strictly before
  // the lock is released.
  // On RISC-V, this emits a fence instruction.
  __sync_synchronize();

  // Release the lock, equivalent to lk->locked = 0.
  // This code doesn't use a C assignment, since the C standard
  // implies that an assignment might be implemented with
  // multiple store instructions.
  // On RISC-V, sync_lock_release turns into an atomic swap:
  //   s1 = &lk->locked
  //   amoswap.w zero, zero, (s1)
  __sync_lock_release(&lk->locked);

  pop_off();
}

问题:有没有acquire获取锁时如果在使用状态会将当前进程让出使用权的使用方法?

回答:

有一种锁机制在获取锁失败时,会将当前进程让出 CPU 使用权,这样可以避免自旋锁的忙等待所导致的 CPU 资源浪费。这类锁被称为睡眠锁或阻塞锁。与自旋锁不同的是,睡眠锁在无法获取锁时会将线程或进程置于等待状态,释放 CPU 给其他进程使用。

在 C++11 或现代多线程编程中,可以使用诸如 std::mutex 来实现这种行为。std::mutex 是一种互斥锁,当获取锁失败时,线程会被阻塞,而不是进行忙等待。

示例:使用 std::mutex 实现阻塞锁 cpp 复制代码 #include #include #include

std::mutex mtx;

void critical_section(int id) { std::cout << "Thread " << id << " is trying to acquire the lock\n"; // 获取锁,失败时当前线程会阻塞直到锁可用 std::lock_guardstd::mutex lock(mtx); std::cout << "Thread " << id << " has acquired the lock\n";

// 模拟临界区操作
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "Thread " << id << " is releasing the lock\n";

}

int main() { std::thread t1(critical_section, 1); std::thread t2(critical_section, 2);

t1.join();
t2.join();

return 0;

}

工作原理:

阻塞等待:当一个线程尝试获取 std::mutex 时,如果锁已经被其他线程持有,该线程会被自动挂起,进入等待状态,直到锁被释放。这样,CPU 资源可以被调度给其他任务,而不是进行忙等待。 自动释放:std::lock_guardstd::mutex 是 RAII(资源获取即初始化)风格的锁,进入作用域时获取锁,离开作用域时自动释放锁。

对比自旋锁:
CPU 使用效率:自旋锁在等待时会持续占用 CPU,适合锁等待时间非常短的情况;
睡眠锁在等待时会将 CPU 让给其他任务,更适合锁等待时间可能较长的情况。

复杂度:睡眠锁实现需要操作系统内核或线程库支持,能有效地管理线程状态,减少忙等待。
在 Linux 内核中,还有一种机制叫 semaphores(信号量) 和 std::condition_variable,也可以用来实现类似的等待和唤醒机制。

附录: spinning和polling两个词的区别

Spinning 和 Polling 都是等待某个条件成立时进行的操作,但它们的用途和实现方式有所不同:

  1. Spinning(自旋) 定义:自旋是一种锁的实现方法,即当一个线程或进程尝试获取锁时,如果锁当前被其他线程持有,它会持续检查锁的状态(在循环中反复读取锁标志),直到锁可用。 特点: 常用于自旋锁,例如在多线程编程中,用于确保线程在短时间内等待锁时不放弃 CPU 使用权。 不让出 CPU:线程会持续占用 CPU 资源进行轮询检查。 适合短时间的锁等待,因为长时间自旋会浪费 CPU 资源。

  2. Polling(轮询) 定义:轮询是一种机制,线程或进程在间隔一段时间后不断检查某个条件是否满足,通常用于等待输入或状态改变。 特点: 常用于检查设备状态、网络连接或数据是否准备就绪等。 有可能让出 CPU:根据实现,轮询可以在两次检查之间进入睡眠或延迟,以减少 CPU 占用。 可以用于较长时间的等待或没有明确触发机制的条件检查。

区别总结: CPU 使用:
Spinning 会持续占用 CPU 进行检查,不会在等待过程中让出 CPU。 Polling 可以设计为在检查之间有延迟或睡眠,让出 CPU,降低资源消耗。

用途:
Spinning 多用于锁的实现,确保高效等待锁的短暂可用状态。 Polling 常用于轮询外部事件或条件,通常涉及 I/O 设备或非同步状态的检查。

性能:
Spinning 在锁等待时间短时性能好,但锁等待时间长会造成 CPU 资源浪费。 Polling 通过在检查间隔中让出 CPU 可以减少资源浪费,但增加了等待响应时间。

适用场景: Spinning 适合在多核处理器上使用,短时间等待锁资源的场景。 Polling 适合需要定期检查某个状态或条件但无需高实时性的场景,比如网络数据准备、传感器数据采集等。