Многопоточный сбор данных с использованием цепочек связанных cURL-запросов. Часть 1

Библиотека cURL — лучший друг разработчика парсеров, ботов и прочих средств онлайн-автоматизации. Она позволяет сделать многие сложные вещи простым и универсальным способом (достаточно лишь правильно настроить опции cURL). Но все оборачивается по-другому, когда дело доходит до многопоточности, то бишь параллельности выполнения запросов. Эта довольно «скользкая» тема почему-то у многих вызывает большие сложности с пониманием. Во всяком случае именно так было со мной, когда я впервые столкнулся с необходимостью распараллеливания запросов. Мое первое знакомство с multi-curl было весьма непродолжительным, сколь и печальным. Короче говоря, официальный мануал по мультикурлу в PHP я попросту не осилил.

…Но мир не без добрых людей. Оказалось, что существует множество оберток мультикурла, позволяющих пользоваться всеми его преимуществами, не вникая при этом в суть его магии. Одна из самых популярных таких оберток — PHP-класс Rolling Curl. Имея под капотом наиболее эффективную реализацию мультикурла, Rolling Curl предоставляет простой и удобный интерфейс для параллельного выполнения множественных HTTP запросов. С помощью этого класса я смог сразу же и без особых проблем применить многопоточность на практике, чему был несказанно рад. С тех пор я стал большим его поклонником… Собственно на этом можно было бы и закончить эту статью, а то и вовсе и не начинать, если бы… опять не одно «но».

…Но однажды мне понадобилось сделать нечто большее, чем просто скачиватель страниц. И тогда выяснилось, что Rolling Curl имеет один недостаток, который затрудняет его применение для целого ряда задач. Недостаток этот — отсутствие связей между запросами, в том числе между данными, полученными в результате этих запросов.

Далее я попытаюсь обосновать с помощью примеров необходимость таких связей.

В простых случаях отсутствие связей между запросами нам никак не мешает. Например, если нам нужно просто получить некоторые данные со страниц некоего сайта. Рассмотрим эту ситуацию подробнее. Для определенности будем считать, что каждая страница сайта — это некое сообщение, и нам нужно: а) скачать в несколько потоков все страницы сайта; б) вытянуть оттуда содержимое сообщений; и в) сохранить их вместе с соответствующими URL в виде строк в БД или в файле. Используя класс Rolling Curl, мы бы написали для решения этой задачи примерно такой код:

// список URL'ов страниц сайта
$a_urls = array(
    'http://example.com/page1.html',
    'http://example.com/page2.html',
    // ...
);

// колбек для обработки ответа от каждого запроса
function request_callback($response, $info, $request) {
    $msg = parse_msg($response);
    if ($msg) {
        add_data($request->url, $msg);
    }
}

function parse_msg($content) {
    // парсим сообщение из содержимого страницы
    // и записываем результат в переменную $result
    return $result;
    // ...
}

function add_data($url, $msg) {
    // сохраняем полученные данные (URL и сообщение)
    // как строку данных в БД или в файле
    // ...
}

// создаем объект RollingCurl и назначаем колбек для запросов
$rc = new RollingCurl('request_callback');
// кол-во потоков скачивания страниц
$rc->window_size = 20;
// цикл по списку URL'ов
foreach ($a_urls as $url) {
    // добавляем запрос текущего URL в очередь
    $rc->get($url);
}
// запускаем очередь запросов
$rc->execute();

Приведенный код полностью справляется с поставленной задачей и не нуждается ни в каких наворотах типа поддержки связей между запросами.

Давайте теперь усложним задачу. Предположим, что каждая из страниц сайта содержит ссылки на некие дополнительные ресурсы, которые тоже требуется скачать, обработать и присоединить к общим результатам. Для определенности будем считать, что дополнительные ресурсы — это блок комментариев к соответствующему сообщению, который подгружается динамически через AJAX запрос. Таким образом, на каждый основной запрос на получение страницы нам придется сделать один дополнительный запрос для получения комментариев. При этом нам придется позаботиться о том, чтобы результаты дополнительного запроса попали по назначению, т.е. оказались в одной строке с результатами основного запроса. А для этого необходимо каким-то образом передать в дополнительный запрос информацию о его источнике (т.е. URL страницы сообщения) с тем, чтобы потом, при обработке результатов этого запроса, найти соответствующие результаты основного запроса и объединить их в одной строке. Другими словами нам необходимо обеспечить связь между основными и их производными (дополнительными) запросами.

Вот мы и подобрались к сути описываемой проблемы. К сожалению Rolling Curl не предоставляет явных средств для обеспечения подобных связей. Есть только один, неявный, способ: использовать HTTP-заголовок Referer. Для этого нужно при добавлении дополнительного запроса задать cURL-опцию CURLOPT_REFERER, присвоив ей URL страницы сообщения. Тогда при обработке ответа от этого запроса мы сможем получить URL его источника.

Попробуем применить этот способ на практике. Давайте перепишем наш код с учетом изменившихся условий.

Но прежде нам нужно определиться с организацией выполнения дополнительных запросов относительно основных. Есть два варианта такой организации:

  1. выполнение основных и дополнительных запросов в отдельных очередях запросов.
  2. выполнение основных и дополнительных запросов в одной очереди.

Для реализации первого варианта в нашем случае необходимо обернуть цикл добавления запросов во внешний цикл с 2-мя итерациями так, чтобы дополнительные запросы выполнялись на 2-й итерации внешнего цикла. При этом нам придется использовать два колбека для обработки ответов вместо одного (первый — для основных запросов, второй — для дополнительных), объединить операции сохранения результатов основных и дополнительных запросов в одной процедуре (функции) и поместить ее вызов в колбек для дополнительных запросов. Кроме того, нам придется ввести массивы для хранения промежуточных результатов (строк сообщений), а также для URL’ов дополнительных запросов и соответствующим им referer’ам.
В результате наш код должен принять примерно такой вид:

// список URL'ов основных запросов
$a_urls_main = array(
    'http://example.com/page1.html',
    'http://example.com/page2.html',
    // ...
);
// список URL'ов дополнительных запросов
$a_urls_xtra = array();
// список referer'ов, соответ-щих URL'ам дополнительных запросов
$a_refs_xtra = array();
// массив результатов основных запросов (строки сообщений)
$a_data = array();

// колбек для обработки ответа от каждого основного запроса
function request_callback1($response, $info, $request) {
    global $a_urls_xtra, $a_refs_xtra, $a_data;
    $msg = parse_msg($response);
    if (!$msg) return;
    $url_xtra = parse_xtra_url($response);
    if (!$url_xtra) return;
    $a_urls_xtra[] = $url_xtra;
    $a_refs_xtra[] = $request->url;
    $a_data[$request->url] = $msg;
}

// колбек для обработки ответа от каждого дополнительного запроса
function request_callback2($response, $info, $request) {
    global $a_data;
    $url = $request->options[CURLOPT_REFERER];
    add_data($url, $a_data[$url], $response);
}

function parse_msg($content) {
    // парсим сообщение из содержимого страницы
    // и записываем результат в переменную $result
    // ...
    return $result;
}

function parse_xtra_url($content) {
    // парсим URL запроса комментариев из содержимого страницы
    // и записываем результат в переменную $result
    // ...
    return $result;
}

function add_data($url, $msg, $xtra) {
    // сохраняем полученные данные (URL, сообщение и комменты)
    // как строку данных в БД или в файле
    // ...
}

$a_urls = array(
    $a_urls_main, $a_urls_xtra
);
$a_callbacks = array(
    'request_callback1', 'request_callback2'
);
// создаем объект RollingCurl и назначаем колбек для запросов
$rc = new RollingCurl();
// кол-во потоков скачивания страниц
$rc->window_size = 20;
for ($l = 0; $l < 2 && count($a_urls); $l++) {
    // назначаем колбек для запросов
    $rc->callback = $a_callbacks[$l];
    // цикл по списку URL'ов
    foreach ($a_urls[$l] as $i => $url) {
        $opts = $i? array(CURLOPT_REFERER => $a_refs_xtra[$i]) : 0;
        // добавляем запрос текущего URL в очередь
        $rc->get($url, 0, $opts);
    }
    // запускаем очередь запросов
    $rc->execute();
}

Второй вариант организации выполнения дополнительных запросов является одной из отличительных особенностей Rolling Curl. Во всяком случае, насколько мне известно, в других обертках мультикурла такая фишка отсутствует (если не считать форки самого Rolling Curl). Rolling Curl позволяет добавить новые запросы в текущую очередь запросов непосредственно во время ее выполнения/обработки так, что они будут выполняться наравне с оставшимися запросами из исходного списка. Но при этом, к сожалению, Rolling Curl не позволяет назначить индивидуальные колбеки для разных запросов. Поэтому мы должны использовать один и тот же колбек, как для основных, так и для дополнительных запросов. Это значит, что нам придется как-то определять внутри колбека, в каком контексте он был вызван (для основного или дополнительного запроса), и в зависимости от этого выполнять соответствующую обработку. Для определения контекста вызова можно использовать список URL’ов основных запросов: если колбек был вызван для основного запроса, то текущий URL должен присутствовать в данном списке, и наоборот. В результате наш код должен принять следующий вид:

// список URL'ов основных запросов
$a_urls = array(
    'http://example.com/page1.html',
    'http://example.com/page2.html',
    // ...
);
$a_data = array();

// колбек для обработки ответа от каждого запроса
function request_callback($response, $info, $request) {
    global $a_urls;
    if (in_array($request->url, $a_urls))
        request_callback1($response, $info, $request);
    else
        request_callback2($response, $info, $request);
}

// колбек для обработки ответа от каждого основного запроса
function request_callback1($response, $info, $request) {
    global $rc, $a_data;
    $msg = parse_msg($response);
    if (!$msg) return;
    $url_xtra = parse_xtra_url($response);
    if (!$url_xtra) return;
    $a_data[$request->url] = $msg;
    $rc->get($url_xtra, 0, array(CURLOPT_REFERER => $request->url));
}

// колбек для обработки ответа от каждого дополнительного запроса
function request_callback2($response, $info, $request) {
    global $a_data;
    $url = $request->options[CURLOPT_REFERER];
    add_data($url, $a_data[$url], $response);
}

function parse_msg($content) {
    // парсим сообщение из содержимого страницы
    // и записываем результат в переменную $result
    // ...
    return $result;
}

function parse_xtra_url($content) {
    // парсим URL запроса комментариев из содержимого страницы
    // и записываем результат в переменную $result
    // ...
    return $result;
}

function add_data($url, $msg, $xtra) {
    // сохраняем полученные данные (URL, сообщение и комменты)
    // как строку данных в БД или в файле
    // ...
}

// создаем объект RollingCurl и назначаем колбек для запросов
$rc = new RollingCurl('request_callback');
// кол-во потоков скачивания страниц
$rc->window_size = 20;
// цикл по списку URL'ов
foreach ($a_urls as $url) {
    // добавляем запрос текущего URL в очередь
    $rc->get($url);
}
// запускаем очередь запросов
$rc->execute();

Оба из рассмотренных вариантов организации выполнения дополнительных запросов имеют право на существование, но лично я предпочитаю последний, как более компактный и удобный (во всяком случае для меня). Поэтому в дальнейшем при решении примеров будет использоваться именно этот вариант.

Итак, казалось бы: задача решена и о проблеме связей между запросами можно спокойно забыть… Но не все так просто. Есть один важный момент, на который следует обратить внимание.

Мы решили, очевидно, не самую сложную задачу, применив при этом метод сомнительного качества — я имею в виду неявную передачу данных между запросами через HTTP-заголовки. Почему сомнительного? Потому что у нас нет гарантий того, что этот метод сработает на других, более сложных задачах. В конце концов нам может встретиться задача, в которой потребуется именно явная передача данных между запросами. И такие задачи существуют. К их числу можно отнести парсинг иерархических структур данных.

Рассмотрим такую ситуацию: мы хотим спарсить с некоего вики-подобного сайта все его статьи вместе с категориями, сохранив структуру последних. При этом мы хотим использовать все преимущества Rolling Curl и многопоточного скачивания. Но есть одна проблема: сайт имеет весьма громоздкую и разветвленную иерархию категорий (будем считать, что категории не пересекаются), а его создатели не оставили нам «хлебных крошек». Сие обстоятельство вкупе с условием многопоточности значительно осложняет нам задачу сохранения структуры категорий. Загвоздка здесь в том, что при любой реализации нам не обойтись без цепочек последовательных запросов, воспроизводящих процесс продвижения по ветвям дерева категорий. А это значит, что нам придется обеспечить непосредственную связь между всеми звеньями каждой цепочки запросов, дабы на конце цепочки (т.е. на странице отдельной статьи) иметь возможность воссоздать пройденный путь. Следовательно, без явной передачи данных здесь никак не обойтись.

Такие вот размышления навели меня на мысль о необходимости некоторых изменений в классе Rolling Curl, суть которых сводится к добавлению следующих возможностей:

  1. возможность передавать вместе с запросами пользовательские данные, необходимые в т.ч. для связи между запросами.
  2. возможность назначать индивидуальные колбеки для разных запросов.

Помимо перечисленных нововведений, после знакомства с исходным кодом Rolling Curl, я также внес несколько изменений, касающихся внешнего интерфейса класса.

Во-первых, я избавился от вспомогательных классов RollingCurlException и RollingCurlRequest, поскольку оба они фактически не нужны. Первый вообще имеет скорее декоративное назначение. Второй же используется как контейнер для данных, доступных из функции колбека: URL, метод запроса, данные POST запроса, HTTP-заголовки запроса и опции cURL. Из этих данных в колбеке может понадобиться разве что URL (да и то лишь при редиректах). Все остальные данные нужны только для самого запроса, а в обработчике ответа их использование лишено смысла. Аналогичный вывод можно применить и в отношении классов, используемых для группировки запросов (RollingCurlGroup, GroupRollingCurl, RollingCurlGroupRequest). В том смысле, что необходимость в этих классах отпадает в силу наличия возможности связывания запросов.

Во-вторых, я также избавился от метода single_curl(). Очевидно, в оригинальном классе этот метод был введен специально для случая выполнения одиночного запроса с тем, чтобы вместо многопоточной схемы выполнения запросов в этом случае использовать обычное выполнение в один поток. На мой взгляд такое поведение совершенно не оправдано, поскольку реализации многопоточной и однопоточной схем выполнения значительно отличаются друг от друга. Многопоточная схема предполагает использование колбеков для обработки ответа, тогда как в однопоточной схеме обработка ответа происходит непосредственно в месте вызова запроса. Поэтому мы не можем взять код, предназначенный для однопоточной схемы, и использовать его в многопоточной схеме без внесения дополнительных изменений, и соответственно наоборот. Из этого следует, что мы должны использовать многопоточную схему и класс Rolling Curl строго по назначению — только для выполнения множественных запросов, а также если количество подлежащих к выполнению запросов нам заранее неизвестно. В противном случае, если нам заранее известно, что количество запросов будет равно 1, мы должны обойтись без Rolling Curl и использовать обычную однопоточную схему. Таким образом, метод single_curl() оказывается избыточным.

Результатом описанных изменений стала моя собственная версия Rolling Curl — класс RollingCurlMini, исходники которого доступны на Гитхабе.

Попробуем оценить полезность данного класса на примере решения какой-нибудь задачи. В качестве такого примера можно было бы взять упомянутую задачу парсинга структуры категорий, но боюсь, что ее решение не уместится в рамках одной статьи. Вместо этого давайте лучше вернемся ко второй версии нашей исходной задачи (парсинг сообщений и комментариев) и посмотрим как будет выглядеть ее решение с использованием класса RollingCurlMini. Ниже приведен код такого решения:

// список URL'ов основных запросов
$a_urls = array(
    'http://example.com/page1.html',
    'http://example.com/page2.html',
    // ...
);
$a_data = array();

/**
 * колбек для обработки ответа от каждого основного запроса
 * @param string $response - содержимое ответа
 * @param string $url - URL запроса
 * @param array $info - информация о запросе
 * @param int $id - номер основного запроса в списке
 */
function request_callback1($response, $url, $info, $i) {
    global $rc, $a_data;
    $msg = parse_msg($response);
    if (!$msg) return;
    $url_xtra = parse_xtra_url($response);
    if (!$url_xtra) return;
    $a_data[$i] = $msg;
    $rc->add($url_xtra, 0, 0, 'request_callback2', $i);
}

/**
 * колбек для обработки ответа от каждого дополнительного запроса
 * @param string $response
 * @param string $url
 * @param array $info
 * @param int $id
 */
function request_callback2($response, $url, $info, $i) {
    global $a_urls, $a_data;
    add_data($a_urls[$i], $a_data[$i], $response);
}

/**
 * @param string $content
 */
function parse_msg($content) {
    // парсим сообщение из содержимого страницы
    // и записываем результат в переменную $result
    // ...
    return $result;
}

/**
 * @param string $content
 */
function parse_xtra_url($content) {
    // парсим URL запроса комментариев из содержимого страницы
    // и записываем результат в переменную $result
    // ...
    return $result;
}

/**
 * @param string $url
 * @param string $msg
 * @param string $xtra
 */
function add_data($url, $msg, $xtra) {
    // сохраняем полученные данные (URL, сообщение и комменты)
    // как строку данных в БД или в файле
    // ...
}

// создаем объект RollingCurlMini и 
// присваиваем кол-во потоков скачивания страниц (20)
$rc = new RollingCurlMini(20);
// цикл по списку URL'ов
foreach ($a_urls as $i => $url) {
    // добавляем запрос текущего URL в очередь
    $rc->add($url, 0, 0, 'request_callback1', $i);
}
// запускаем очередь запросов
$rc->execute();

Как видно из кода, теперь нам уже не требуется вспомогательная функция для объединения колбеков для разных типов запросов, поскольку мы можем назначать эти колбеки напрямую. Ну и самое важное, теперь мы не ограничены в средствах обеспечения связи между запросами одним лишь HTTP-заголовком Referer. С использованием класса RollingCurlMini мы можем напрямую передавать между запросами любые данные и иметь к ним доступ в обработчиках ответа. В данном случае в качестве связи используется номер основного запроса в списке, который и передается между каждым основным и его производным запросом.

На этом первая часть статьи закончена. Во второй части будут рассмотрены вопросы обработки cookies и использования прокси в условиях многопоточности запросов.

Спасибо за внимание.