MIT 6.S081 locks

Part 1 Memory allocator

改进内存分配器,实现per cpu freelist,减少在kalloc过程中的锁竞争。

kalloctest的调用栈

sysproc.c    proc.c         vm.c                     kalloc.c
sys_sbrk ->  growproc   |-> uvmalloc     ->          kalloc
                        |_  uvmdealloc -> uvmunmap -> kfree

初次运行kalloctest

$ kalloctest
start test1
test1 results:
--- lock kmem/bcache stats
lock: kmem: #test-and-set 4684888 #acquire() 433086
lock: bcache: #test-and-set 0 #acquire() 2098
--- top 5 contended locks:
lock: kmem: #test-and-set 4684888 #acquire() 433086
lock: uart: #test-and-set 2622037 #acquire() 838
lock: proc: #test-and-set 1381255 #acquire() 258356
lock: proc: #test-and-set 1170755 #acquire() 258355
lock: proc: #test-and-set 1148248 #acquire() 258356
tot= 4684888
test1 FAIL

for each lock, the count of calls to acquire for that lock, and the number of times the loop in acquire tried but failed to set the lock. #test-and-set是自旋次数 #acquire() 是尝试获取锁的次数

需要改进空闲页链表的锁机制,每个CPU一个锁,对应一个空闲页链表 当一个cpu的freelist空了后,从另一个cpu中偷一个(注意加锁)

struct {
  struct spinlock lock;
  struct run *freelist;
  char name[7];
} kmem[NCPU];

static struct run * steal(int cpuid) {
    for (int i = (cpuid + 1) % NCPU; i != cpuid ; i = (i + 1) % NCPU) {
        acquire(&kmem[i].lock);  // 对被偷取的链表加锁
        if (kmem[i].freelist != 0) { // find a not empty list
            // find middle point of the list
            struct run *slow, *fast, *head = kmem[i].freelist;
            slow = kmem[i].freelist;
            fast = kmem[i].freelist;
            while (fast != 0 && fast->next != 0) {
                slow = slow->next;
                fast = fast->next->next;
            }
            // the slow point to the middle
            kmem[i].freelist = slow->next;  // 后半段作为当前的链表头
            slow->next = 0;  // 切断

            release(&kmem[i].lock);
            return head;  // 返回此链表的头作为偷取结果
        }
        release(&kmem[i].lock);
    }
    return 0;
}

// Allocate one 4096-byte page of physical memory.
// Returns a pointer that the kernel can use.
// Returns 0 if the memory cannot be allocated.
void *
kalloc(void)
{
    struct run *r;

    push_off();
    int id = cpuid();
    pop_off();

    acquire(&kmem[id].lock);
    r = kmem[id].freelist;
    if (r) {
        kmem[id].freelist = r->next;
        release(&kmem[id].lock);
    }
    else {
        release(&kmem[id].lock); // 注意释放当前链表的锁,防止死锁(拿着一个锁不放去获取别的锁)
        r = steal(id);
        if (r) {
            acquire(&kmem[id].lock);
            kmem[id].freelist = r->next;
            release(&kmem[id].lock);
        }
    }

    if (r)
        memset((char *) r, 5, PGSIZE); // fill with junk

    return (void *) r;
}

问题:如何给每个cpu初始化freelist hint:回收给当前正在运行的cpu的freelist

在kalloc时偷取其他cpu的freelist,"steal part of the other CPU's free list. " 注意当cpua在steal的时候,不能拿着自己的锁不放

void
kinit()
{
    for (int i = 0; i < NCPU; i++) {
        snprintf(kmem[i].name, 7, "kmem-%d", i);
        initlock(&kmem[i].lock, kmem[i].name);
        kmem[i].freelist = 0;
    }
//    initlock(&kmem.lock, "kmem");
    freerange(end, (void*)PHYSTOP);
}

void
kfree(void *pa)
{
  struct run *r;

  if(((uint64)pa % PGSIZE) != 0 || (char*)pa < end || (uint64)pa >= PHYSTOP)
    panic("kfree");

  // Fill with junk to catch dangling refs.
  memset(pa, 1, PGSIZE);

  push_off();
  int coreid = cpuid();
  pop_off();

  r = (struct run*)pa;

  acquire(&kmem[coreid].lock);
  r->next = kmem[coreid].freelist;
  kmem[coreid].freelist = r;
  release(&kmem[coreid].lock);
}

kalloctest运行结果

$ ./kalloctest
start test1
test1 results:
--- lock kmem/bcache stats
lock: kmem-0: #test-and-set 0 #acquire() 34211
lock: kmem-1: #test-and-set 0 #acquire() 198354
lock: kmem-2: #test-and-set 0 #acquire() 200456
lock: kmem-3: #test-and-set 0 #acquire() 2
lock: kmem-4: #test-and-set 0 #acquire() 2
lock: kmem-5: #test-and-set 0 #acquire() 2
lock: kmem-6: #test-and-set 0 #acquire() 2
lock: kmem-7: #test-and-set 0 #acquire() 2
lock: bcache: #test-and-set 0 #acquire() 342
--- top 5 contended locks:
lock: proc: #test-and-set 21545 #acquire() 509603
lock: virtio_disk: #test-and-set 19495 #acquire() 57
lock: proc: #test-and-set 12286 #acquire() 509606
lock: proc: #test-and-set 7114 #acquire() 509436
lock: proc: #test-and-set 5269 #acquire() 509494
tot= 0
test1 OK
start test2
total free number of pages: 32499 (out of 32768)
.....
test2 OK

Part 2 Buffer cache

bcachetest 创建多个子进程并发完成对文件的读写,竞争buffer cache。初次运行bcachetest结果如下:

同样按照part1的思路,将整个buf分到(按照blockno进行hash)固定大小的桶中,用时间戳来完成LRU,在buf.h中增加timestamp成员。在我的第一次实现中,桶的的大小固定。但是这种实现总是会出现在bget过程中出现panic: bget: no buffer。

#define BUCKET_NUM 13
#define NBUF_PER_BUCKET 3

struct {
  struct buf bucket[BUCKET_NUM][NBUF_PER_BUCKET];
  struct spinlock locks[BUCKET_NUM];
} bcache;

void
binit(void)
{
  struct buf *b;

//    initlock(&bcache.lock, "bcache");
    for (int i = 0; i < BUCKET_NUM; ++i) {
        // 初始化每个桶的lock
        initlock(&bcache.locks[i], "bcache.bucket");
        // 初始化桶中的buffer
        for (b = bcache.bucket[i]; b < bcache.bucket[i] + NBUF_PER_BUCKET; b++) {
            initsleeplock(&b->lock, "buffer");
            b->timestamp = ticks;
        }
    }
}

// Look through buffer cache for block on device dev.
// If not found, allocate a buffer.
// In either case, return locked buffer.
static struct buf*
bget(uint dev, uint blockno)
{
    struct buf *b;

    // Is the block already cached?
    int index = blockno % BUCKET_NUM;

    // acquire the bucket lock
    acquire(&bcache.locks[index]);

    for (b = bcache.bucket[index]; b < bcache.bucket[index] + NBUF_PER_BUCKET; b++) {
        if (b->dev == dev && b->blockno == blockno) {
            b->refcnt++;
            b->timestamp = ticks; // update timestamp
            release(&bcache.locks[index]);
            acquiresleep(&b->lock);
            return b;
        }
    }

    // Not cached.
    // Recycle the least recently used (LRU) unused buffer.
    int replace_i = -1;
    uint least_timestamp = ~0;

    for (int i = 0; i < NBUF_PER_BUCKET; ++i) {  // look
        b = &bcache.bucket[index][i];
        if (b->refcnt == 0 && b->timestamp < least_timestamp) {
            replace_i = i;
            least_timestamp = b->timestamp;
        }
    }
    if (replace_i != -1) {
        b = &bcache.bucket[index][replace_i];
        b->dev = dev;
        b->blockno = blockno;
        b->valid = 0;
        b->refcnt = 1;
        b->timestamp = ticks;

        release(&bcache.locks[index]);
        acquiresleep(&b->lock);
        return b;
    } else {
        panic("bget: no buffers");
    }
}

void
brelse(struct buf *b)
{
    if (!holdingsleep(&b->lock))
        panic("brelse");

    releasesleep(&b->lock);

    int index = b->blockno % BUCKET_NUM;
    acquire(&bcache.locks[index]);
    b->refcnt--;
    release(&bcache.locks[index]);
}

void
bpin(struct buf *b) {
    int index = b->blockno % BUCKET_NUM;
    acquire(&bcache.locks[index]);
    b->refcnt++;
    release(&bcache.locks[index]);
}

void
bunpin(struct buf *b) {
    int index = b->blockno % BUCKET_NUM;
    acquire(&bcache.locks[index]);
    b->refcnt--;
    release(&bcache.locks[index]);
}

第二次设计

bucket中的buf改用链表连接,保留原来的线性buf表,每次从buf链表中申请buf放入相对应的桶中,如果buf链表用完了,就从其他bucket中偷取refcnt为0的buf到当前的bucket中。

在这个过程中,锁的使用很巧妙,参考了网上的设计。

注意:

不能拿着一个小锁去申请获取其他小锁(哲学家进餐问题);

先加大锁,再拿小锁。在向其他桶借buf的过程中,要先加大锁,防止其他借buf的行为发生,再加小锁,锁定特定的两个bucket。

每次释放锁之后,任何事情都会发生,需要再检查一遍释放锁之前的事情是否改变。所以在具体实现时,把检查自己桶中的空buf和其他桶中的空buf放在一次加锁循环中进行。

在测试时,最好先make clean 再 make qemu