服務報價 | 域名主機 | 網絡營銷 | 軟件工具| [加入收藏]
?熱線電話: 130-5800-8007
當前位置: 主頁 > php教程 > php教程 >

使用PHP SWOOLE 定時同步MySQL數據

時間:2017-04-09 14:00來源:未知 作者:最模板 點擊:
南寧公司和幾個分公司之間都使用了呼叫系統,然后現在需要做一個呼叫通話數據分析,由于分公司的呼叫服務器是在內網,通過技術手段映射出來,分公司到南寧之間的網絡不穩定,

南寧公司和幾個分公司之間都使用了呼叫系統,然后現在需要做一個呼叫通話數據分析,由于分公司的呼叫服務器是在內網,通過技術手段映射出來,分公司到南寧之間的網絡不穩定,所以需要把分公司的通話數據同步到南寧。

本身最簡單的方法就是直接配置MySQL的主從同步就可以同步數據到南寧來了。但是銷售呼叫系統那邊的公司不給MySQL權限我們。 所以這個方法只能放棄了。

于是我們干脆的想,使用PHP來實現定時一個簡易的PHP定時同步工具,然后PHP進程常駐后臺運行,所以首先就先到了一個PHP組件:SWOOLE,經過討論,分公司的每天半天生成的數據量最大在5000條左右,所以這個方案是可行,就這樣干。

我們使用PHP SWOOLE 做一個異步的定時任務系統。

本身MySQL數據庫的主從同步是通過解析Master庫中的binary-log來進行同步數據到從庫的。然而我們使用PHP來同步數據的時候,那么只能從master庫分批查詢數據,然后插入到南寧的slave庫來了。

這里我們使用的框架是 ThinkPHP 3.2 .

首先安裝PHP擴展: SWOOLE,因為沒有使用到特別的功能,所以這里我們使用pecl來快速安裝:

pecl install swoole

安裝完成后在 php.ini 里面加入 extension="swoole.so" 安裝完成后,我們使用 phpinfo()來檢查是否成功了.

?

安裝成功了,我們就來寫業務.

服務端

1、首先啟動一個后臺的服務端,監聽端口9501

public function index()
{
   $serv = new \swoole_server("0.0.0.0", 9501);
   $serv->set([
       'worker_num' => 1,//一般設置為服務器CPU數的1-4倍
       'task_worker_num' => 8,//task進程的數量
       'daemonize' => 1,//以守護進程執行
       'max_request' => 10000,//最大請求數量
       "task_ipc_mode " => 2 //使用消息隊列通信,并設置為爭搶模式
   ]);
   $serv->on('Receive', [$this, 'onReceive']);//接收任務,并投遞
   $serv->on('Task', [$this, 'onTask']);//可以在這個方法里面處理任務
   $serv->on('Finish', [$this, 'onFinish']);//任務完成時候調用
   $serv->start();
}

2、接收和投遞任務

public function onReceive($serv, $fd, $from_id, $data)
{
    //使用json_decode 解析任務數據
   $areas = json_decode($data,true);
   foreach ($areas as $area){
       //投遞異步任務
       $serv->task($area);
   }
}

3、任務執行,數據從master庫查詢和寫入到slave數據庫

public function onTask($serv, $task_id, $from_id, $task_data)
{
   $area = $task_data;//參數是地區編號
   $rows = 50; //每頁多少條
   //主庫地址,根據參數地區($area)編號切換master數據庫連接
   //從庫MySQL實例,根據參數地區($area)編號切換slave數據庫連接
   //由于程序是常駐內存的,所以MySQL連接可以使用長連接,然后重復利用。要使用設計模式的,可以使用對象池模式
    Code......

   //master 庫為分公司的數據庫,slave庫為數據同步到南寧后的從庫
   Code......

   //使用$sql獲取從庫中最大的自增: SELECT MAX(id) AS maxid FROM ss_cdr_cdr_info limit 1
   $slaveMaxIncrementId = ...;

   //使用$sql獲取主庫中最大的自增: SELECT MAX(id) AS maxid FROM ss_cdr_cdr_info limit 1
   $masterMaxIncrementId = ...;

   //如果相等的就不同步了
   if($slaveMaxIncrementId >= $masterMaxIncrementId){
       return false;
   }

   //根據條數計算頁數
   $dataNumber = ceil($masterMaxIncrementId - $slaveMaxIncrementId);
   $eachNumber = ceil($dataNumber / $rows);
   $left = 0;

   //根據頁數來進行分批循環進行寫入,要記得及時清理內存
   for ($i = 0; $i < $eachNumber; $i++) {
       $left = $i == 0 ? $slaveMaxIncrementId : $left + $rows;
       $right = $left + $rows;
       //生成分批查詢條件
       //$where = "id > $left AND <= $right";
       $masterData = ...;//從主庫查詢數據
       $slaveLastInsertId = ...;//插入到從庫
       unset($masterData,$slaveLastInsertId);
   }

   echo "New AsyncTask[id=$task_id]".PHP_EOL;
   $serv->finish("$area -> OK");
}

4、任務完成時候調用

public function onFinish($serv, $task_id, $task_data)
{
   echo "AsyncTask[$task_id] Finish: $task_data".PHP_EOL;
}

客戶端推送任務

到此基本完成,剩下來我們來寫客戶端任務推送

public function index()
{
   $client = new \swoole_client(SWOOLE_SOCK_TCP);
   if (!$client->connect('127.0.0.1', 9501, 1)) {
       throw new Exception('鏈接SWOOLE服務錯誤');
   }
   $areas = json_encode(['liuzhou','yulin','beihai','guilin']);
   //開始遍歷檢查
   $client->send($areas);
   echo "任務發送成功".PHP_EOL;
}

至此基本完成了,剩下的我們來寫一個shell腳本定時執行:/home/wwwroot/sync_db/crontab/send.sh

#!/bin/bash
PATH=/bin:/sbin:/usr/bin:/usr/sbin:/usr/local/bin:/usr/local/sbin:~/bin
export PATH

# 定時推送異步的數據同步任務
/usr/bin/php /home/wwwroot/sync_db/server.php home/index/index

使用crontab定時任務,我們把腳本加入定時任務

#設置每天12:30執行數據同步任務
30 12 * * * root /home/wwwroot/sync_db/crontab/send.sh
#設置每天19:00執行數據同步任務
0 19 * * * root /home/wwwroot/sync_db/crontab/send.sh

Tips: 最好推薦在里面加入寫日志操作,這樣好知道是任務推送、執行是否成功。

至此基本完成,程序有待優化~~~,各位看客有更好的方法歡迎提出。

(責任編輯:最模板)
頂一下
(1)
100%
踩一下
(0)
0%
------分隔線----------------------------
体彩22选5开奖结果