对non-lock queue的简单分析

(2012年的文章,归档在此)

今天看到 @梁斌penny 在打擂:http://coderpk.com

内容如下:

游戏规则: 比赛由pennyliang,就是本人梁斌同志坐庄,我提交baseline代码(可执行程序),和部分代码,方便统一游戏规则。 10亿数据(每个数据看作一个同志),1个队列,10个线程push,10个线程pop,走完一遍,考察总耗时,耗时最短的获胜。我的代码在我自己机器10亿数据排队进出,耗时1分28秒,内存峰值256MB(CPU16核,真8核那种,Intel(R) Xeon(R) CPU E5540 @ 2.53GHz) 于是下载代码看了一下,首先我们分析一下代码的设计:

static __inline__ unsigned long long rdtsc(void)
{
  unsigned long long int x;
     __asm__ volatile ("rdtsc" : "=A" (x));
     return x;
}

首先我们要知道这个queue里面放的数据是timestamp,使用rdtsc这个汇编命令来取得(这里要小小吐槽一下,这个命令基本上已经deprecated了)。因此,你要实现的队列要操作的数据就应该是rdtsc取出的时间戳。

在main.cpp中,包含一个队列的struct:

struct lock_free_queue

它包含两个操作:

    void push(unsigned long long pop_time)
    bool pop()

它的实现没有给出,需要参赛选手自己来给出答案(这里面有还一个隐含的问题:你需要自己去实现队列的结构及相关的内存管理)。

然后,在main方法中,使用pthread创建线程,进行队列的push和pop操作:

pthread_create(thread_push_end,NULL,push_end,&lfq);

把整个代码的设计搞明白以后,我们就可以分析一下为什么这个实现如此之快了。当然,秘密肯定在实现部分,但是原代码我们是没有的。我们再仔细看一下游戏规则,并没有说不能反向工程对不对?于是拿出分析工具objdump:

$ objdump -d main > out

这样我们就将程序的汇编代码生成了。首先来看main的代码:

0000000000400940 <main>:
…
  400976:       e8 bd fc ff ff          callq  400638 <malloc@plt>
…
  40099f:       e8 c4 fc ff ff          callq  400668 <pthread_create@plt>
…

main.cpp的代码都是对应的:

pthread_t* thread_push = (pthread_t*) malloc(10*sizeof( pthread_t));

pthread_create(&thread_pop[i],NULL,pop,&lfq);

好了,知道了阅读方法,我们接着来看核心的pop及push的实现。从push开始:

00000000004008c0 <_Z4pushPv>:
  4008c0:       31 f6                   xor    %esi,%esi
  4008c2:       b9 01 00 00 00          mov    $0x1,%ecx
  4008c7:       0f 31                   rdtsc
  4008c9:       48 c1 e2 20             shl    $0x20,%rdx
  4008cd:       89 c0                   mov    %eax,%eax
  4008cf:       48 09 c2                or     %rax,%rdx
  4008d2:       48 89 c8                mov    %rcx,%rax
  4008d5:       f0 0f c1 47 10          lock xadd %eax,0x10(%rdi)
  4008da:       25 ff ff ff 01          and    $0x1ffffff,%eax
  4008df:       48 c1 e0 03             shl    $0x3,%rax
  4008e3:       48 03 47 18             add    0x18(%rdi),%rax
  4008e7:       48 89 10                mov    %rdx,(%rax)
  4008ea:       48 89 c8                mov    %rcx,%rax
  4008ed:       f0 0f c1 47 10          lock xadd %eax,0x10(%rdi)
  4008f2:       25 ff ff ff 01          and    $0x1ffffff,%eax
  4008f7:       48 c1 e0 03             shl    $0x3,%rax
  4008fb:       48 03 47 18             add    0x18(%rdi),%rax
  4008ff:       48 89 10                mov    %rdx,(%rax)
  400902:       48 89 c8                mov    %rcx,%rax
  400905:       f0 0f c1 47 10          lock xadd %eax,0x10(%rdi)
  40090a:       25 ff ff ff 01          and    $0x1ffffff,%eax
  40090f:       48 c1 e0 03             shl    $0x3,%rax
  400913:       48 03 47 18             add    0x18(%rdi),%rax
  400917:       48 89 10                mov    %rdx,(%rax)
  40091a:       48 89 c8                mov    %rcx,%rax
  40091d:       f0 0f c1 47 10          lock xadd %eax,0x10(%rdi)
  400922:       25 ff ff ff 01          and    $0x1ffffff,%eax
  400927:       83 c6 01                add    $0x1,%esi
  40092a:       48 c1 e0 03             shl    $0x3,%rax
  40092e:       48 03 47 18             add    0x18(%rdi),%rax
  400932:       81 fe 40 78 7d 01       cmp    $0x17d7840,%esi
  400938:       48 89 10                mov    %rdx,(%rax)
  40093b:       75 8a                   jne    4008c7 <_Z4pushPv+0x7>
  40093d:       f3 c3                   repz retq
  40093f:       90                      nop

Push的代码比较有意思,有4段相同的东西:

lock xadd %eax,0x10(%rdi)
and    $0x1ffffff,%eax
shl    $0x3,%rax
add    0x18(%rdi),%rax
mov    %rdx,(%rax)
mov    %rcx,%rax

这说明什么?gcc把cpp的循环进行优化了:

for(int i=0;i<count_per_thread_push/4;++i)
	lfq->push(now);
	lfq->push(now);
	lfq->push(now);
	lfq->push(now);
}

此外,还有这里:

cmp    $0x17d7840,%esi
jne    4008c7 <_Z4pushPv+0x7>

这个0x17d7840转换成10进制是多少?答案:25000000

嘿嘿,没错,gcc在编译的时候把count_per_thread_push/4给除好了。

这些都是gcc帮我们在把c的代码转成汇编时,优化的点。

好了,不讲费话,我们继续正题,这个代码中实现的高效的秘密在哪里?我们刚才其实已经得到了入栈的实现:

lock xadd %eax,0x10(%rdi)
and    $0x1ffffff,%eax
shl    $0x3,%rax
add    0x18(%rdi),%rax
mov    %rdx,(%rax)
mov    %rcx,%rax

这个对应:

lfq->push(now);

注意到lock xadd了吗?bingo! 这个就是高效的秘密了。为了验证我们的结论,可以去看看pop的实现,果然发现也是用lock xadd来实现的:

mov    $0x1,%eax
lock xadd %eax,0x8(%rbx)

注意,入栈地址是:

0x10(%rdi)

出栈地址是:

0x8(%rbx)

此外,出栈时前面的一句也很重要:

mov    $0x1,%eax

因此,整个实现的核心就是lock xadd的使用,对应到C中的实现代码,可以推测出实现方法类似于:

asm volatile
(
    "lock\n\t"
    "xadd %1, %0":
    "+m"( *mem ), "=r"( r ):
    "1"( val ):
    "memory", "cc"
);

代码就分析到这里,再说就成了直接给答案了 :-) 总的来讲,这个代码的实现可优化的点还是非常多的,祝大家玩得开心!