[ Поиск ] - [ Пользователи ] - [ Календарь ]
Полная Версия: как я map-reduce и "многопоточность" изобрел
Invis1ble
Предисловие:
Пишите в личку принцип, кто разобрался
Далее мы поиграем всеми участниками дискуссии в игру "Отрефактори говнокод" xD

(хотя там особо ничего сложного нет, в этом и изюминка ИМХО :) )

есть 300к серверов с которых надо получить контент, распарсить его, собрать и обработать данные и положить в БД
узкое место - ожидание ответа от удаленного ресурса, некоторые за 0.5 сек отвечают, а некоторые по таймауту 20 сек отваливаются
если принять среднее время ответа за 2 сек, получаем:

2 * 302000 = 604000 сек = 604000 / 60 / 60 / 24 ~ 7 суток

Что если распараллелить?

На кашу в коде, богомерзкое нарушение MVC и кривые именования не обращайте внимание (код одноразовый), главное - сама суть.
Осторожно, многа букафф:

    public function action_parse_sites()
{
$directory = APPPATH . 'logs/parser/';
$threads_count = 20;

/**================ Map ================*/
if (!file_exists($directory . '1.dat'))
{
$db = Database::instance();
$db_expr_null = DB::expr('NULL');

$ids = DB::select('id')
->
from('sites')
->
where('description', 'IS', $db_expr_null)
->
where('keywords', 'IS', $db_expr_null)
->
or_where_open()
->
where('description', '=', '')
->
or_where('keywords', '=', '')
->
or_where_close()
->
execute($db)
->
as_array(null, 'id');

$ids_count = count($ids);
$size = ceil($ids_count / $threads_count);

for ($i = 0; $i < $threads_count; ++ $i)
{
file_put_contents($directory . ($i + 1) . '.dat', serialize(array_splice($ids, 0, $size)));
}
}


for ($i = 0; $i < $threads_count; ++ $i)
{
if (file_exists($directory . ($i + 1) . '.dat') && !file_exists($directory . ($i + 1) . '.lock'))
{
$this->_log_parser($i + 1, 'Started');
file_put_contents($directory . ($i + 1) . '.lock', '');
$this->_log_parser($i + 1, 'Locked');

try
{
$ids = unserialize(file_get_contents($directory . ($i + 1) . '.dat'));
$this->_log_parser($i + 1, 'Retrieved ' . count($ids) . ' IDs');
$j = 0;

/** ================ Reduce ================*/
while (($id = array_shift($ids)) !== false)
{
$site = ORM::factory('site', $id);

if (!$site->loaded())
continue;

try
{
$result = $site->_get_content();

$is_available = $result['content'] !== null;

if ($is_available)
{
$description = $site->parse_description($result['content']);
$site->description = $description !== '' ? $description: null;

$keywords = $site->parse_keywords($result['content']);
$site->keywords = $keywords !== '' ? $keywords : null;

$site
->values([
'is_available' => (int)$is_available,
'last_pinged_at' => time(),
])
->
update()
->
clear();
}
}

catch (Curl_Exception $exception) {}
catch (ORM_Validation_Exception $exception)
{
Kohana::$log->add(Log::ERROR, Kohana_Exception::text($exception));
$strace = Kohana_Exception::text($exception) . "\n--\n" . $exception->getTraceAsString();
Kohana::$log->add(Log::STRACE, $strace);
Kohana::$log->write();

continue;
}
catch (ErrorException $exception)
{
if (strstr($exception->getMessage(), 'mb_convert_encoding') !== false)
{
Kohana::$log->add(Log::ERROR, Kohana_Exception::text($exception));
$strace = Kohana_Exception::text($exception) . "\n--\n" . $exception->getTraceAsString();
Kohana::$log->add(Log::STRACE, $strace);
Kohana::$log->write();

continue;
}
else
{
throw $exception;
}
}


++ $j;

if (!($j % 100))
{
$this->_log_parser($i + 1, $j . ' IDs processed');
}
}


$this->_log_parser($i + 1, 'All IDs processed succesfully');

unlink($directory . ($i + 1) . '.dat');
unlink($directory . ($i + 1) . '.lock');

$this->_log_parser($i + 1, 'Service files removed');
break;
}
catch (Exception $exception)
{
$this->_log_parser($i + 1, 'Unexpected exception [' . $exception->getCode() . ']: ' . $exception->getMessage());

if (isset($ids) && !empty($ids))
{
file_put_contents($directory . ($i + 1) . '.dat', serialize($ids));
$this->_log_parser($i + 1, 'Unprocessed IDs stored (' . count($ids) . ' left)');
}
else
{
$this->_log_parser($i + 1, 'No IDs processed');
}

unlink($directory . ($i + 1) . '.lock');
$this->_log_parser($i + 1, 'Unlocked' . PHP_EOL . PHP_EOL);
throw $exception;
}
}
}
}


protected function _log_parser($thread, $message)
{
file_put_contents(APPPATH . 'logs/parser/' . $thread . '.log', '[ ' . date('d.m.Y H:i:s') . ' ] ' . $message . PHP_EOL, FILE_APPEND);
}



часть одного из 20 логов
Цитата
[ 18.07.2013 09:16:03 ] Started
[ 18.07.2013 09:16:03 ] Locked
[ 18.07.2013 09:16:03 ] Retrieved 12361 IDs
[ 18.07.2013 09:17:33 ] 100 IDs processed
[ 18.07.2013 09:19:09 ] 200 IDs processed
[ 18.07.2013 09:21:06 ] 300 IDs processed
[ 18.07.2013 09:22:25 ] 400 IDs processed
[ 18.07.2013 09:24:03 ] 500 IDs processed
[ 18.07.2013 09:26:23 ] 600 IDs processed
[ 18.07.2013 09:28:15 ] 700 IDs processed
[ 18.07.2013 09:29:33 ] 800 IDs processed
[ 18.07.2013 09:31:07 ] 900 IDs processed
[ 18.07.2013 09:33:29 ] 1000 IDs processed
[ 18.07.2013 09:34:57 ] 1100 IDs processed
[ 18.07.2013 09:36:38 ] 1200 IDs processed
[ 18.07.2013 09:38:20 ] 1300 IDs processed
[ 18.07.2013 09:40:02 ] 1400 IDs processed
[ 18.07.2013 09:41:42 ] 1500 IDs processed
[ 18.07.2013 09:43:27 ] 1600 IDs processed
[ 18.07.2013 09:44:57 ] 1700 IDs processed
[ 18.07.2013 09:46:38 ] 1800 IDs processed
[ 18.07.2013 09:48:01 ] 1900 IDs processed
[ 18.07.2013 09:50:46 ] 2000 IDs processed
[ 18.07.2013 09:52:14 ] 2100 IDs processed
[ 18.07.2013 09:53:35 ] 2200 IDs processed
[ 18.07.2013 09:55:12 ] 2300 IDs processed
[ 18.07.2013 09:57:09 ] 2400 IDs processed
[ 18.07.2013 09:58:46 ] 2500 IDs processed
[ 18.07.2013 10:00:30 ] 2600 IDs processed
[ 18.07.2013 10:02:07 ] 2700 IDs processed
[ 18.07.2013 10:04:13 ] 2800 IDs processed
[ 18.07.2013 10:06:12 ] 2900 IDs processed
[ 18.07.2013 10:07:27 ] 3000 IDs processed




Спустя 9 минут, 13 секунд Invis1ble написал(а):

прогнозируемое время обработки в 20 "потоков":
10:07:27 - 09:16:03 * (12361 / 3000) ~ 210 минут
вместо 3 суток, как если бы был запущен один экземпляр скрипта
(на самом деле на час примерно больше, вчера просто 45 к обработал данных из этих 302 к)



Спустя 14 минут, 2 секунды Invis1ble написал(а):
Запустил по крону 20 раз скрипт (раз в минуту) - все работает, каждый "поток" обрабатывает свою часть данных



Спустя 14 минут, 33 секунды Invis1ble написал(а):
Я велосипедист!

_____________

Профессиональная разработка на заказ

Я на GitHub | второй профиль

Быстрый ответ:

 Графические смайлики |  Показывать подпись
Здесь расположена полная версия этой страницы.
Invision Power Board © 2001-2024 Invision Power Services, Inc.