swoole扩展自带的task进程功能非常强大,可以用来实现各种复杂的业务逻辑。本文主要介绍使用task/finish功能实现程序内的map-reduce并发任务处理。一个聊天服务经常会有群聊需求,我的群组和群组内成员,另外群组内成员需要按照积分排序,类似与这样的功能就可以使用swoole简单实现。
传统多线程方案 创建2个全局变量map,group_map以group_id为key,存储成员set。user_map以uid为key存储当前用户加入的所有group。
多线程环境下实际上不能直接操作这2个map,必须要加锁。当添加用户到一个组或者用户退出一个组时需要操作这2个map,必须要加锁。如果操作很频繁,实际上锁的碰撞是很严重的,这部分操作就会变成串行的。同时只有一个线程可以对map进行操作。锁的争抢也会带来大量线程切换浪费很多cpu资源。
lock.lock();group_map[group_id].append([uid, score]);user_map[uid].append(group_id);group_map.sortbyscore();lock.unlock();
基于swoole的task功能 基于swoole的task功能,可以将任务切片,然后hash投递到不同的task进程,完成任务。排序功能可以直接使用php提供的 splheap 实现,时间复杂度为o(logn),如果要实现查询功能,如根据uid查询用户加入的所有群组,根据groupid查询有哪些成员。可以先计算hash找到对应task进程,然后通过task/taskwait发送指令,直接读取进程的变量查找到信息。
$serv->set(array(task_worker_num => 24));$serv->task(array(cmd => user, uid => $uid, gid => $gid, score => $score), $gid % $task_worker_num);$serv->task(array(cmd => group, uid => $uid, gid => $gid), $uid % $task_worker_num);class mymaxheap extends splheap{ public function compare($value1, $value2) { return ($value1['score'] - $value2['score']); }}function ontask($serv, $taskid, $srcworkerid, $data) { static $usermap = array(); static $groupmap = array(); if ($data['cmd'] == 'group') { if (!isset($groupmap[$data['gid']])) { $groupmap[$data['gid']] = new mymaxheap(); } $heap = $groupmap[$data['gid']]; $heap->insert(array(uid => $data['uid'], score => $data['score'])); } elseif ($data['cmd'] == 'user') { $usermap[$data['uid']][] = $data['gid']; }}
由于task进程只有数组操作,所以是非阻塞的,只需要开启与cpu核数相同的进程数量即可。进程间无任何加锁争抢,性能非常好。swoole的task进程通信使用unixsocket,是内核提供的全内存通信方式无任何io,一写一读单进程可达100万/秒。虽然没有直接读变量的速度快,但性能也足够了。
————–伟大的分割线—————
php饭米粒(phpfamily) 由一群靠谱的人建立,愿为phper带来一些值得细细品味的精神食粮!
本文由 rango 独家授权 php饭米粒发布,转载请注明本来源信息和以下的二维码(长按可识别二维码关注):