对non-lock queue的简单分析
(2012年的文章,归档在此)
今天看到 @梁斌penny 在打擂:http://coderpk.com
内容如下:
游戏规则: 比赛由pennyliang,就是本人梁斌同志坐庄,我提交baseline代码(可执行程序),和部分代码,方便统一游戏规则。 10亿数据(每个数据看作一个同志),1个队列,10个线程push,10个线程pop,走完一遍,考察总耗时,耗时最短的获胜。我的代码在我自己机器10亿数据排队进出,耗时1分28秒,内存峰值256MB(CPU16核,真8核那种,Intel(R) Xeon(R) CPU E5540 @ 2.53GHz) 于是下载代码看了一下,首先我们分析一下代码的设计:
static __inline__ unsigned long long rdtsc(void)
{
unsigned long long int x;
__asm__ volatile ("rdtsc" : "=A" (x));
return x;
}
首先我们要知道这个queue里面放的数据是timestamp,使用rdtsc这个汇编命令来取得(这里要小小吐槽一下,这个命令基本上已经deprecated了)。因此,你要实现的队列要操作的数据就应该是rdtsc取出的时间戳。
在main.cpp中,包含一个队列的struct:
struct lock_free_queue
它包含两个操作:
void push(unsigned long long pop_time)
bool pop()
它的实现没有给出,需要参赛选手自己来给出答案(这里面有还一个隐含的问题:你需要自己去实现队列的结构及相关的内存管理)。
然后,在main方法中,使用pthread创建线程,进行队列的push和pop操作:
pthread_create(thread_push_end,NULL,push_end,&lfq);
把整个代码的设计搞明白以后,我们就可以分析一下为什么这个实现如此之快了。当然,秘密肯定在实现部分,但是原代码我们是没有的。我们再仔细看一下游戏规则,并没有说不能反向工程对不对?于是拿出分析工具objdump:
$ objdump -d main > out
这样我们就将程序的汇编代码生成了。首先来看main的代码:
0000000000400940 <main>:
…
400976: e8 bd fc ff ff callq 400638 <malloc@plt>
…
40099f: e8 c4 fc ff ff callq 400668 <pthread_create@plt>
…
和main.cpp
的代码都是对应的:
pthread_t* thread_push = (pthread_t*) malloc(10*sizeof( pthread_t));
…
pthread_create(&thread_pop[i],NULL,pop,&lfq);
好了,知道了阅读方法,我们接着来看核心的pop及push的实现。从push开始:
00000000004008c0 <_Z4pushPv>:
4008c0: 31 f6 xor %esi,%esi
4008c2: b9 01 00 00 00 mov $0x1,%ecx
4008c7: 0f 31 rdtsc
4008c9: 48 c1 e2 20 shl $0x20,%rdx
4008cd: 89 c0 mov %eax,%eax
4008cf: 48 09 c2 or %rax,%rdx
4008d2: 48 89 c8 mov %rcx,%rax
4008d5: f0 0f c1 47 10 lock xadd %eax,0x10(%rdi)
4008da: 25 ff ff ff 01 and $0x1ffffff,%eax
4008df: 48 c1 e0 03 shl $0x3,%rax
4008e3: 48 03 47 18 add 0x18(%rdi),%rax
4008e7: 48 89 10 mov %rdx,(%rax)
4008ea: 48 89 c8 mov %rcx,%rax
4008ed: f0 0f c1 47 10 lock xadd %eax,0x10(%rdi)
4008f2: 25 ff ff ff 01 and $0x1ffffff,%eax
4008f7: 48 c1 e0 03 shl $0x3,%rax
4008fb: 48 03 47 18 add 0x18(%rdi),%rax
4008ff: 48 89 10 mov %rdx,(%rax)
400902: 48 89 c8 mov %rcx,%rax
400905: f0 0f c1 47 10 lock xadd %eax,0x10(%rdi)
40090a: 25 ff ff ff 01 and $0x1ffffff,%eax
40090f: 48 c1 e0 03 shl $0x3,%rax
400913: 48 03 47 18 add 0x18(%rdi),%rax
400917: 48 89 10 mov %rdx,(%rax)
40091a: 48 89 c8 mov %rcx,%rax
40091d: f0 0f c1 47 10 lock xadd %eax,0x10(%rdi)
400922: 25 ff ff ff 01 and $0x1ffffff,%eax
400927: 83 c6 01 add $0x1,%esi
40092a: 48 c1 e0 03 shl $0x3,%rax
40092e: 48 03 47 18 add 0x18(%rdi),%rax
400932: 81 fe 40 78 7d 01 cmp $0x17d7840,%esi
400938: 48 89 10 mov %rdx,(%rax)
40093b: 75 8a jne 4008c7 <_Z4pushPv+0x7>
40093d: f3 c3 repz retq
40093f: 90 nop
Push的代码比较有意思,有4段相同的东西:
lock xadd %eax,0x10(%rdi)
and $0x1ffffff,%eax
shl $0x3,%rax
add 0x18(%rdi),%rax
mov %rdx,(%rax)
mov %rcx,%rax
这说明什么?gcc把cpp的循环进行优化了:
for(int i=0;i<count_per_thread_push/4;++i)
lfq->push(now);
lfq->push(now);
lfq->push(now);
lfq->push(now);
}
此外,还有这里:
cmp $0x17d7840,%esi
jne 4008c7 <_Z4pushPv+0x7>
这个0x17d7840
转换成10进制是多少?答案:25000000
嘿嘿,没错,gcc在编译的时候把count_per_thread_push/4
给除好了。
这些都是gcc帮我们在把c的代码转成汇编时,优化的点。
好了,不讲费话,我们继续正题,这个代码中实现的高效的秘密在哪里?我们刚才其实已经得到了入栈的实现:
lock xadd %eax,0x10(%rdi)
and $0x1ffffff,%eax
shl $0x3,%rax
add 0x18(%rdi),%rax
mov %rdx,(%rax)
mov %rcx,%rax
这个对应:
lfq->push(now);
注意到lock xadd了吗?bingo! 这个就是高效的秘密了。为了验证我们的结论,可以去看看pop的实现,果然发现也是用lock xadd来实现的:
mov $0x1,%eax
lock xadd %eax,0x8(%rbx)
注意,入栈地址是:
0x10(%rdi)
出栈地址是:
0x8(%rbx)
此外,出栈时前面的一句也很重要:
mov $0x1,%eax
因此,整个实现的核心就是lock xadd的使用,对应到C中的实现代码,可以推测出实现方法类似于:
asm volatile
(
"lock\n\t"
"xadd %1, %0":
"+m"( *mem ), "=r"( r ):
"1"( val ):
"memory", "cc"
);
代码就分析到这里,再说就成了直接给答案了 :-) 总的来讲,这个代码的实现可优化的点还是非常多的,祝大家玩得开心!