Но перспективы такие:
1. Событийная модель получения файла.
2. За счет событийной модели нет холостых выхлопов на циклы.
3. Отсутствие блокировки основного потока для чтения кусков.
4. Запрашиваем столько данных сколько нам нужно или насколько позволяет производительность диска.
5. Возможность динамического шейпирования.
Тесты проводил на не особо быстрой машинке, но при чтении файла и его записи получил в среднем 100 мегабайт в сек, при чтении из памяти в память получил около 400 мегабайт в сек (чет медленно, видно машинка такая).
Вот собсна говнокод:
shmop class
class SHMOP
{
private $shmop_key;
public static $header_size = 6; // Max 999 999 bytes recv
function __destruct()
{
if(!$this->shmop_key)
return;
$this->delete();
$this->close();
}
public function close()
{
if(shmop_close($this->shmop_key))
return true;
else
return false;
}
private function delete()
{
if(shmop_delete($this->shmop_key))
return true;
else
return false;
}
public function open($fullpath, $flags, $mode = 0, $block_size = 0)
{
// Add header size
$shmop_key = self::key($fullpath);
$shm_id = shmop_open($shmop_key, $flags, $mode, $block_size);
if(!$shm_id)
return false;
$this->shmop_key = $shm_id;
// Временный костыль
shmop_write($this->shmop_key, "\0", 0);
return true;
}
public function read()
{
// Check SpinLock
$command = shmop_read($this->shmop_key, 0, 1);
if($command === "\0")
return '';
// Get Header
$datasize = (int)shmop_read($this->shmop_key, 1, self::$header_size);
$data = shmop_read($this->shmop_key, self::$header_size+1, $datasize);
// release spinlock
shmop_write($this->shmop_key, "\0", 0);
return $data;
}
public function canwrite()
{
// Check SpinLock
if(shmop_read($this->shmop_key, 0, 1) === "\0")
return true;
else
return false;
}
public function write($data)
{
// Check SpinLock
$command = shmop_read($this->shmop_key, 0, 1);
if($command !== "\0")
return false;
// Check datasize header lenght and add zero symbol.
$datasize = strlen($data);
if(strlen($datasize) < self::$header_size)
$datasize .= "\0";
// Write Header
shmop_write($this->shmop_key, $datasize, 1);
// Write Data
shmop_write($this->shmop_key, $data, self::$header_size+1);
// Write spinlock
shmop_write($this->shmop_key, "\1", 0);
return true;
}
public function feof()
{
// Check SpinLock
if(shmop_read($this->shmop_key, 0, 1) === "\2")
return true;
else
return false;
}
public function sendeof()
{
// Check SpinLock
$command = shmop_read($this->shmop_key, 0, 1);
if($command !== "\0")
return false;
shmop_write($this->shmop_key, "\2", 0);
return true;
}
public static function key($fullpath = __FILE__)
{
return "0x123";
}
}
proc_client
<?php
$GLOBALS['child_pid'] = null;
$GLOBALS['dps'] = 0;
$descriptorspec = array(
0 => array("pipe", "r"), // stdin - канал, из которого дочерний процесс будет читать
1 => array("pipe", "w"), // stdout - канал, в который дочерний процесс будет записывать
2 => array("file", "./error-output.txt", "a") // stderr - файл для записи
);
$process = proc_open('/usr/local/bin/php proc_server.php', $descriptorspec, $pipes);
if(!is_resource($process))
die('could not open proc');
require_once('shmop2.class.php');
$GLOBALS['SHMOP'] = new SHMOP;
if(!$GLOBALS['SHMOP']->open('proc_server.php', 'c', 0644, 200000))
{
echo 'cannot create block';
exit;
}
$GLOBALS['file'] = fopen('out.avi', 'wb');
// Send my pid
fwrite($pipes[0], 'parent_pid:'.posix_getpid()."\n");
$fd = $pipes[1];
$base = event_base_new();
$event = event_new();
event_set($event, $fd, EV_READ | EV_PERSIST, "command", array($event, $base));
event_base_set($event, $base);
event_add($event);
$signal = event_new();
event_set($signal, SIGIO, EV_SIGNAL | EV_PERSIST, "IO_signal", array($signal, $base));
event_base_set($signal, $base);
event_add($signal);
$event_timer = event_timer_new();
event_timer_set($event_timer, 'handler_timer', $event_timer);
event_base_set($event_timer, $base);
event_timer_add($event_timer, 1000000);
event_add($event_timer);
event_base_loop($base);
function handler_timer($res, $status, $timer)
{
echo 'Data Per Sec: '.convert($GLOBALS['dps'])."\n";
$GLOBALS['dps'] = 0;
event_timer_add($timer, 1000000);
}
function IO_signal()
{
if(!$GLOBALS['SHMOP']->feof())
{
$data = $GLOBALS['SHMOP']->read();
$GLOBALS['dps'] += strlen($data);
if($data !== '')
{
//fwrite($GLOBALS['file'], $data);
posix_kill($GLOBALS['child_pid'], SIGIO);
}
}
else
{
fclose($GLOBALS['file']);
echo "close file\n";
}
}
function command($fd, $events, $arg)
{
$com = trim(fgets($fd));
$com_arr = explode(':', $com);
switch($com_arr[0])
{
case 'child_pid':
// Set parent pid
$GLOBALS['child_pid'] = $com_arr[1];
echo "begin file\n";
posix_kill($GLOBALS['child_pid'], SIGIO);
break;
}
}
function convert($size)
{
$unit=array('b','kb','mb','gb','tb','pb');
return @round($size/pow(1024,($i=floor(log($size,1024)))),2).' '.$unit[$i];
}
?>
proc_server.php
<?php
set_error_handler('my_error_handler');
function my_error_handler($no,$str,$file,$line)
{
file_put_contents('./err.log', $no.$str.$file.$line."\n", FILE_APPEND);
}
require_once('shmop2.class.php');
$GLOBALS['SHMOP'] = new SHMOP;
if(!$GLOBALS['SHMOP']->open('proc_server.php', 'w'))
{
echo 'cannot open block';
exit;
}
$GLOBALS['fakedata'] = '';
for($i=0;$i<32000;$i++)
{
$GLOBALS['fakedata'] .= '1';
}
$GLOBALS['file'] = fopen('in.avi', 'rb');
$GLOBALS['parent_pid'] = null;
$fd = STDIN;
$base = event_base_new();
$event = event_new();
event_set($event, $fd, EV_READ | EV_PERSIST, "command", array($event, $base));
event_base_set($event, $base);
event_add($event);
$signal = event_new();
event_set($signal, SIGIO, EV_SIGNAL | EV_PERSIST, "IO_signal", array($signal, $base));
event_base_set($signal, $base);
event_add($signal);
event_base_loop($base);
function IO_signal($fd, $events, $arg)
{
if(!feof($GLOBALS['file']))
{
if($GLOBALS['SHMOP']->canwrite())
{
$GLOBALS['SHMOP']->write($GLOBALS['fakedata']);
posix_kill($GLOBALS['parent_pid'], SIGIO);
}
}
else
{
if($GLOBALS['SHMOP']->sendeof())
posix_kill($GLOBALS['parent_pid'], SIGIO);
fclose($GLOBALS['file']);
}
}
function command($fd, $events, $arg)
{
// print the line
$com = trim(fgets($fd));
$com_arr = explode(':', $com);
switch($com_arr[0])
{
case 'parent_pid':
// Set parent pid
$GLOBALS['parent_pid'] = $com_arr[1];
// Send child pid
fwrite(STDOUT, 'child_pid:'.posix_getpid()."\n");
break;
}
}
?>
Еще нужно протестировать всё и упаковать в удобный класс, чтобы легко и просто можно было подключать для клиент-серверной коммуникации.
Особо не ругайте )
Спустя 3 минуты, 59 секунд (7.04.2012 - 02:33) I++ написал(а):
К слову, асинхронный не верное понятие, просто модный тренд, скорее не блокирующий в данном случае

Спустя 5 часов, 57 минут, 38 секунд (7.04.2012 - 08:31) I++ написал(а):
Продолжаю тему говнокода, очередной раз libevent прикол устроил, его колбэк не может принимать self в классе, ну что-же, это печально.
Client:
Server:
Теперь осталось добавить больше проверок и вывод инфы об ошибках если таковые возникнут.
Так же добавить возможность работы не только с 1 файлом или потоком.
Ну и косметический подход, задокументировать всё :D
<?php
class SHMOP
{
private static $shmop_key;
private static $PIPES;
private static $process;
private static $header_size = 6; // Max 999 999 bytes recv
private static $event_stdio;
private static $event_sigio;
private static $pid;
public static function init($type, $fullpath, $memory = null, &$base)
{
if($type === 'client')
{
if(!is_numeric($memory))
return false;
$descriptorspec = array(0 => array("pipe", "r"), 1 => array("pipe", "w"));
self::$process = proc_open('/usr/local/bin/php -f '.$fullpath, $descriptorspec, self::$PIPES);
if(!is_resource(self::$process))
return false;
if(!self::open($fullpath, 'c', 0644, $memory))
return false;
self::$event_stdio = event_new();
// костыль с SHMOP::stdio_command (Подумай как исправить)
event_set(self::$event_stdio, self::$PIPES[1], EV_READ | EV_PERSIST, __CLASS__.'::stdio_command');
event_base_set(self::$event_stdio, $base);
event_add(self::$event_stdio);
self::$event_sigio = event_new();
event_set(self::$event_sigio, SIGIO, EV_SIGNAL | EV_PERSIST, "IO_signal");
event_base_set(self::$event_sigio, $base);
event_add(self::$event_sigio);
// Send my pid
fwrite(self::$PIPES[0], 'parent_pid:'.posix_getpid()."\n");
return true;
}
else if($type === 'server')
{
if(!self::open($fullpath, 'w'))
return false;
self::$event_stdio = event_new();
// костыль с SHMOP::stdio_command (Подумай как исправить)
event_set(self::$event_stdio, STDIN, EV_READ | EV_PERSIST, __CLASS__.'::stdio_command');
event_base_set(self::$event_stdio, $base);
event_add(self::$event_stdio);
self::$event_sigio = event_new();
event_set(self::$event_sigio, SIGIO, EV_SIGNAL | EV_PERSIST, "IO_signal");
event_base_set(self::$event_sigio, $base);
event_add(self::$event_sigio);
return true;
}
else
return false;
}
public static function send_signal()
{
posix_kill(self::$pid, SIGIO);
}
public static function stdio_command($fd, $events)
{
$com = trim(fgets($fd));
$com_arr = explode(':', $com);
switch($com_arr[0])
{
case 'child_pid':
self::$pid = $com_arr[1];
break;
case 'parent_pid':
// Set parent pid
self::$pid = $com_arr[1];
// Send child pid
fwrite(STDOUT, 'child_pid:'.posix_getpid()."\n");
break;
}
}
function __destruct()
{
if(!self::$shmop_key)
return;
$this->delete();
$this->close();
}
public function close()
{
if(shmop_close(self::$shmop_key))
return true;
else
return false;
}
private function delete()
{
if(shmop_delete(self::$shmop_key))
return true;
else
return false;
}
private static function open($fullpath, $flags, $mode = 0, $block_size = 0)
{
// Add header size
$shmop_key = self::key($fullpath);
self::$shmop_key = shmop_open($shmop_key, $flags, $mode, $block_size);
if(!self::$shmop_key)
return false;
// Временный костыль
shmop_write(self::$shmop_key, "\0", 0);
return true;
}
public static function read()
{
// Check SpinLock
$command = shmop_read(self::$shmop_key, 0, 1);
if($command === "\0")
return '';
// Get Header
$datasize = (int)shmop_read(self::$shmop_key, 1, self::$header_size);
$data = shmop_read(self::$shmop_key, self::$header_size+1, $datasize);
// release spinlock
shmop_write(self::$shmop_key, "\0", 0);
return $data;
}
public static function canwrite()
{
// Check SpinLock
if(shmop_read(self::$shmop_key, 0, 1) === "\0")
return true;
else
return false;
}
public static function write($data)
{
// Check SpinLock
$command = shmop_read(self::$shmop_key, 0, 1);
if($command !== "\0")
return false;
// Check datasize header lenght and add zero symbol.
$datasize = strlen($data);
if(strlen($datasize) < self::$header_size)
$datasize .= "\0";
// Write Header
shmop_write(self::$shmop_key, $datasize, 1);
// Write Data
shmop_write(self::$shmop_key, $data, self::$header_size+1);
// Write spinlock
shmop_write(self::$shmop_key, "\1", 0);
return true;
}
public static function feof()
{
// Check SpinLock
if(shmop_read(self::$shmop_key, 0, 1) === "\2")
return true;
else
return false;
}
public static function sendeof()
{
// Check SpinLock
$command = shmop_read(self::$shmop_key, 0, 1);
if($command !== "\0")
return false;
shmop_write(self::$shmop_key, "\2", 0);
return true;
}
public static function key($fullpath = __FILE__)
{
return "0x123";
}
}
?>
Client:
<?php
$GLOBALS['dps'] = 0;
$GLOBALS['start_timer'] = 0;
require_once('shmop3.class.php');
$base = event_base_new();
$event_timer = event_timer_new();
event_timer_set($event_timer, 'handler_timer', $event_timer);
event_base_set($event_timer, $base);
event_timer_add($event_timer, 1000000);
event_add($event_timer);
var_dump(SHMOP::init('client', 'proc_server.php', 200000, $base));
event_base_loop($base);
function IO_signal()
{
if(!SHMOP::feof())
{
$data = SHMOP::read();
$GLOBALS['dps'] += strlen($data);
if($data !== '')
{
//fwrite($GLOBALS['file'], $data);
SHMOP::send_signal();
}
}
else
{
//fclose($GLOBALS['file']);
echo "close file\n";
}
}
function handler_timer($res, $status, $timer)
{
if($GLOBALS['start_timer'] == 0)
{
$GLOBALS['start_timer'] = 1;
// Start file
SHMOP::send_signal();
}
echo 'Data Per Sec: '.convert($GLOBALS['dps'])."\n";
$GLOBALS['dps'] = 0;
event_timer_add($timer, 1000000);
}
function convert($size)
{
$unit=array('b','kb','mb','gb','tb','pb');
return @round($size/pow(1024,($i=floor(log($size,1024)))),2).' '.$unit[$i];
}
?>
Server:
<?php
set_error_handler('my_error_handler');
function my_error_handler($no,$str,$file,$line)
{
file_put_contents('./err.log', $no.$str.$file.$line."\n", FILE_APPEND);
}
require_once('shmop3.class.php');
$GLOBALS['fakedata'] = '';
for($i=0;$i<190000;$i++)
{
$GLOBALS['fakedata'] .= '1';
}
$base = event_base_new();
SHMOP::init('server', 'proc_server.php', null, $base);
event_base_loop($base);
function IO_signal($fd, $events, $arg)
{
if(true)
{
if(SHMOP::canwrite())
{
SHMOP::write($GLOBALS['fakedata']);
SHMOP::send_signal();
}
}
else
{
if(SHMOP::sendeof())
SHMOP::send_signal();
}
}
?>
Теперь осталось добавить больше проверок и вывод инфы об ошибках если таковые возникнут.
Так же добавить возможность работы не только с 1 файлом или потоком.
Ну и косметический подход, задокументировать всё :D