引入背景:假如我们每天有10000个订单生成,需要同步到仓储系统中去,以前做法是开启一个crontab去跑这些任务,但是发现总有感觉同步效率低,间隔时间都是分钟级别的。
解决方案测试:我们将同步订单的任务表添加一个hash作为key,作为分发条件,因为mysql中select如果做mod函数是用不到索引的,所以我们自己做随机hash,但是务必不需要范围太大,以免服务器资源不够,方法是根据hashkey投放到不同的进程中进行同步,测试代码如下
<?php /** * Created by PhpStorm. * User: xujun * Date: 2017/8/26 * Time: 9:37 */ //假定需要处理的数据如下 class Process{ public $mpid =0; public $max_precess =5; //代替从数据库中读取的内容 public $task = [ [ 'uid' =>1, 'uname' => 'bot' , 'hash' =>1, 'handle' => 'test' ], [ 'uid' =>2, 'uname' => 'bot1' , 'hash' =>2, 'handle' => 'test' ], [ 'uid' =>3, 'uname' => 'bot2' , 'hash' =>3, 'handle' => 'test' ], [ 'uid' =>4, 'uname' => 'bot3' , 'hash' =>4, 'handle' => 'test' ], [ 'uid' =>2, 'uname' => 'bot4' , 'hash' =>2, 'handle' => 'test' ], [ 'uid' =>3, 'uname' => 'bot5' , 'hash' =>3, 'handle' => 'test' ], [ 'uid' =>4, 'uname' => 'bot6' , 'hash' =>1, 'handle' => 'test' ], ]; public $works = []; public $swoole_table = NULL; //public $new_index=0; function test( $index , $task ){ print_r( "[" . date ( 'Y-m-d H:i:s' ). "]" . 'work-index:' . $index . '处理' . $task [ 'uname' ]. '完成' .PHP_EOL); } public function __construct(){ try { $this ->swoole_table = new swoole_table(1024); $this ->swoole_table->column( 'index' , swoole_table::TYPE_INT); //用于父子进程间数据交换 $this ->swoole_table->create(); swoole_set_process_name(sprintf( 'php-ps:%s' , 'master' )); $this ->mpid = posix_getpid(); $this ->run(); $this ->processWait(); } catch (\Exception $e ){ die ( 'ALL ERROR: ' . $e ->getMessage()); } } public function run(){ for ( $i =0; $i < $this ->max_precess; $i ++) { $this ->CreateProcess(); } } private function getTask( $index ){ $_return = []; foreach ( $this ->task as $v ){ if ( $v [ 'hash' ]== $index ){ $_return [] = $v ; } } return $_return ; } public function CreateProcess( $index =null){ if ( is_null ( $index )){ //如果没有指定了索引,新建的子进程,开启计数 $index = $this ->swoole_table->get( 'index' ); if ( $index === false){ $index = 0; } else { $index = $index [ 'index' ]+1; } print_r( $index ); } $this ->swoole_table->set( 'index' , array ( 'index' => $index )); $process = new swoole_process( function (swoole_process $worker ) use ( $index ){ swoole_set_process_name(sprintf( 'php-ps:%s' , $index )); $task = $this ->getTask( $index ); foreach ( $task as $v ){ call_user_func_array( array ( $this , $v [ 'handle' ]), array ( $index , $v )); } sleep(20); }, false, false); $pid = $process ->start(); $this ->works[ $index ]= $pid ; return $pid ; } public function rebootProcess( $ret ){ $pid = $ret [ 'pid' ]; $index = array_search ( $pid , $this ->works); if ( $index !==false){ $index = intval ( $index ); $new_pid = $this ->CreateProcess( $index ); echo "rebootProcess: {$index}={$new_pid} Done\n" ; return ; } throw new \Exception( 'rebootProcess Error: no pid' ); } public function processWait(){ while (1) { if ( count ( $this ->works)){ $ret = swoole_process::wait(); if ( $ret ) { $this ->rebootProcess( $ret ); } } else { break ; } } } } $process = new Process(); |
这里代码中,使用了swoole_table作为进程间共享的内存,为了分配index。以及当进程退出后,父进程通过wait重新拉起该进程任务。
测试截图
进程ps
结果 休眠20s后退出后会被自动拉起
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持脚本之家。