Многопоточный сбор данных с использованием цепочек связанных cURL-запросов. Часть 2

Первую часть статьи читайте здесь.

До сих пор мы еще никак не касались вопросов обработки cookies и использования прокси в условиях многопоточности запросов. Пора исправить эту оплошность и уделить внимание этим вопросам, имеющим большое практическое значение при разработке парсеров.

В качестве примера будем рассматривать все ту же задачу парсинга сообщений и комментариев. Пойдем дальше по пути ее усложнения. Предположим, что запрос блока комментариев к сообщению не может быть сделан без предварительного чтения этого сообщения. Технически это ограничение выражается в том, что при запросе блока комментариев сервер требует от клиента наличия специфичных cookies, устанавливаемых при чтении соответствующего сообщения. Применительно к нашей задаче это означает необходимость эмуляции обработки cookies, причем эта эмуляция должна осуществляться раздельно для каждой пары запросов «сообщение — комментарии».

В cURL эмуляция обработки cookies не представляет особой сложности. Достаточно присвоить опциям CURLOPT_COOKIEFILE и CURLOPT_COOKIEJAR имя файла для хранения cookies, и всю работу по обработке cookies возьмет на себя cURL. Но это касается только линейной схемы запросов, когда все запросы выполняются последовательно и в одном потоке. Многопоточная схема запросов в этом плане имеет свои особенности. В этом случае может быть два варианта:

  1. все запросы равны между собой, так что порядок их выполнения не имеет значения.
  2. все запросы разбиваются на цепочки последовательных запросов, каждая из которых выполняется в строго заданном порядке.

Автоматическая обработка cookies имеет смысл только во втором случае, к которому относится и наша задача. При этом процедура настройки cURL для обработки cookies в данном случае имеет некоторые отличия от стандартной схемы. Суть отличий состоит в том, что каждой цепочке запросов должен соответствовать свой индивидуальный файл для хранения cookies.

Применим автоматическую обработку cookies к нашей задаче. Нам нужно для каждой пары запросов «сообщение — комментарии» отдельно устанавливать опции CURLOPT_COOKIEFILE / CURLOPT_COOKIEJAR, присваивая им имя (+путь) индивидуального файла, где будут храниться cookies в рамках данной пары запросов. Для задания имени индивидуального файла нам придется использовать какое-нибудь уникальное значение (идентификатор), однозначно определяющее соответствующую пару запросов. При этом мы должны иметь доступ к этому значению из обоих запросов, составляющих пару. Это значит, что нам опять потребуется связь между запросами из одной пары. В первой части статьи мы уже установили такую связь, используя для этого номер основного запроса. Его же мы можем использовать и в рассматриваемой задаче в качестве идентификатора для каждой пары запросов.

Итак, мы выяснили как решить проблему эмуляции обработки cookies. Но это еще не все, что можно сделать в плане эмуляции. «Хороший» парсер должен уметь «прикидываться» обычным посетителем/пользователем сайта или хотя бы делать все возможное, чтобы избежать подозрений. В нашем случае есть одно обстоятельство, которое может «выдать» наш парсер. Речь конечно же идет об IP-адресах, которые являются основным объектом при анализе статистики обращений к сайту. В текущей реализации все запросы нашего парсера будут идти от одного и того же IP-адреса. С учетом использования многопоточной схемы запросов это неизбежно приводит к ситуации, когда запросы из разных пар выполняются одновременно. Естественно, с точки зрения сервера такая ситуация выглядит подозрительной: обычный посетитель не может одновременно читать несколько сообщений (да еще и впридачу комментарии к ним). Понятно, что это может быть только бот.

Для того, чтобы избежать таких подозрений, необходимо использовать разные IP для запросов в пределах одного буфера выполнения, т.е. для всех запросов, которые могут выполняться одновременно. В многопоточной схеме запросов наибольшее число запросов, которые могут выполняться одновременно, определяется количеством потоков. Выполнение запросов от разных IP-адресов осуществляется с помощью прокси-серверов. Раскрытие темы прокси и принципов работы с ними выходит за рамки данной статьи. Здесь мы коснемся только основных моментов, имеющих прямое отношение к нашей задаче.

Предположим, что у нас есть список HTTP прокси, которые мы хотим использовать в нашем парсере. Для задействования прокси в cURL-запросах служит опция CURLOPT_PROXY, которая принимает в качестве значения IP-адрес и порт прокси-сервера. Применительно к нашей задаче, для каждого запроса в пределах одного буфера выполнения мы должны использовать свой уникальный прокси-сервер. Это означает, что размер буфера выполнения (количество потоков) не должен превышать количество прокси-серверов. Кроме того, из условий задачи следует, что каждый запрос комментариев должен использовать тот же прокси, который использовался в соответствующем запросе сообщения. Другими словами, при запросе сообщения мы должны запоминать используемый в нем прокси с тем, чтобы потом использовать этот прокси в производном запросе комментариев.

Теперь у нас есть все необходимое для реализации требуемого парсера. Очевидно, что это потребует большего, чем прежде, числа используемых данных. Поэтому было бы не лишним перейти к ООП-стилю.

С учетом всего перечисленного исходный код реализации требуемого парсера будет иметь следующий вид:

class Scraper
{

const N_THREADS = 10;
const N_MAXLOOPS = 3;
const FILE_COOKIE = 'cookie-%03d.txt';

static protected $A_CURL_OPTS = array(
    CURLOPT_NOBODY => 0,
    CURLOPT_RETURNTRANSFER => 1,
    CURLOPT_SSL_VERIFYPEER => 0,
    CURLOPT_SSL_VERIFYHOST => 0,
    CURLOPT_FOLLOWLOCATION => true,
    CURLOPT_AUTOREFERER => true,
    CURLOPT_MAXREDIRS => 3,
    CURLOPT_USERAGENT => 'Mozilla/5.0 Gecko/20100101 Firefox/12.0',
    CURLOPT_TIMEOUT => 20,
    CURLOPT_CONNECTTIMEOUT => 15,
    CURLINFO_HEADER_OUT => true,
);

/**
 * @var array - исходный список URL'ов страниц сообщений
 */
static protected $A_URLS = array(
    'http://example.com/page/1/',
    'http://example.com/page/4/',
    'http://example.com/page/5/',
);

/**
 * @var array - исходный список прокси (здесь они намеренно невалидные)
 */
static protected $A_PROXIES = array(
    '500.500.500.1:80',
    '500.500.500.2:80',
    '500.500.500.3:80',
);

/**
 * @var object - объект класса RollingCurlMini
 */
protected $oMc = 0;

/**
 * @var string - шаблон имени индивидуального файла cookies
 */
protected $sCookieFile = '';

/**
 * @var array - массив для хранения спарсенных данных
 */
protected $aData = array();

/**
 * @var array - массив используемых прокси
 */
protected $aProxies = array();


public function __construct() {
    $this->sCookieFile = dirname(__FILE__). '/'. self::FILE_COOKIE;
}

/**
 * Процесс парсинга
 * @param int $nThreads - кол-во потоков
 */
public function run($nThreads = 0) {
    $this->initData();
    if (!$this->aData) return;
    $this->initProxies();
    $nThreads = min(
        $nThreads? $nThreads : self::N_THREADS,
        count($this->aProxies)
    );
    $this->oMc = new RollingCurlMini($nThreads);
    $this->oMc->setOptions(self::$A_CURL_OPTS);
    for ($l = 0; $l < N_MAXLOOPS && count($this->aData); $l++) {
        foreach ($this->aData as $id => $a_item) {
            $this->oMc->add(
                $url, 0, array($this, 'handleMsg'), $id,
                $this->buildItemOptions($id, $a_item['url'], true)
            );
        }
        $this->oMc->execute();
    }
}

/**
 * Инициализация свойства aData
 */
protected function initData() {
    $this->aData = array();
    foreach (self::$A_URLS as $id => $url)
        $this->aData[$id] = array('url' => $url);
}

/**
 * Инициализация свойства aProxies
 */
protected function initProxies() {
    $this->aProxies = self::$A_PROXIES;
}

/**
 * Колбек для обработки ответа от запроса страницы сообщения
 * @param string $content - содержимое ответа
 * @param string $url - URL запроса
 * @param array $aInfo - информация о запросе
 * @param int $id - номер страницы сообщения
 */
public function handleMsg($content, $url, $aInfo, $id) {
    if (!isset($this->aData[$id])) return;
    if (!$content) {
        $this->excludeItemProxy($id);
        return;
    }
    $msg = $this->parseMsg($content);
    if (!$msg) return;
    $this->aData[$id]['msg'] = $msg;
    $url_xtra = $this->parseXtraUrl($content);
    if (!$url_xtra) return;
    $this->oMc->add(
        $url_xtra, 0, array($this, 'handleXtra'), $id,
        $this->buildItemOptions($id, $url)
    );
}

/**
 * Парсинг страницы сообщения
 * @param int $id - номер страницы сообщения
 * @param string $content - содержимое страницы сообщения
 */
protected function parseMsg($id, $content) {
    // парсим сообщение из содержимого страницы
    // и записываем результат в переменную $result
    // ...
    return $result;
}

/**
 * Парсинг URL запроса комментариев
 * @param string $content - содержимое страницы сообщения
 */
protected function parseXtraUrl($content) {
    // парсим URL запроса комментариев из содержимого страницы
    // и записываем результат в переменную $result
    // ...
    return $result;
}

/**
 * Колбек для обработки ответа от запроса комментариев
 * @param string $content
 * @param string $url
 * @param array $aInfo
 * @param int $id - номер соотв-щей страницы сообщения
 */
public function handleXtra($content, $url, $aInfo, $id) {
    if (!isset($this->aData[$id])) return;
    if (!$content) return;
    $r_item = &$this->aData[$id];
    $this->addDataRow($r_item['url'], $r_item['msg'], $content);
    unset($this->aData[$id]);
}

/** Сохранение строки данных
 * @param string $url
 * @param string $msg
 * @param string $xtra
 */
function addDataRow($url, $msg, $xtra) {
    // сохраняем полученные данные (URL, сообщение и комменты)
    // как строку данных в БД или в файле
    // ...
}

/**
 * Составление cURL-опций для запроса из заданной пары запросов
 * @param int $id - номер страницы сообщения, соотв-щей заданной паре
 * @param string $url - URL запроса
 * @param bool $b1st - является ли данный запрос первым в своей паре
 * @return array - массив cURL-опций
 */
protected function buildItemOptions($id, $url, $b1st = false) {
    if (!isset($this->aData[$id])) return false;
    $r_item = &$this->aData[$id];
    $s_cookie = sprintf($this->sCookieFile, $id);
    if ($b1st && file_exists($s_cookie)) unlink($s_cookie);
    $a_opts = array(
        CURLOPT_COOKIEFILE => $s_cookie, CURLOPT_COOKIEJAR => $s_cookie
    );
    if ($n = count($this->aProxies)) {
        if ($b1st)
            $r_item['i_proxy'] = mt_rand(0, $n- 1);
        $a_opts[CURLOPT_PROXY] = $this->aProxies[$r_item['i_proxy']];
    }
    if ($r_item['url_prev'])
        $a_opts[CURLOPT_REFERER] = $r_item['url_prev'];
    $r_item['url_prev'] = $url;
    return $a_opts;
}

/**
 * Исключение прокси, использованного в заданной паре запросов, 
 * из списка прокси
 * @param int $id - номер страницы сообщения, соотв-щей заданной паре
 */
protected function excludeItemProxy($id) {
    if (!isset($this->aData[$id]) || !$this->aData[$id]['i_proxy'])
        return;
    $i = $this->aData[$id]['i_proxy'];
    if (!isset($this->aProxies[$i]))
        return;
    unset($this->aProxies[$i]);
    $this->aProxies = array_values($this->aProxies);
}

}

$o_scraper = new Scraper();
$o_scraper->run();

На этом все. Спасибо за внимание.

Многопоточный сбор данных с использованием цепочек связанных cURL-запросов. Часть 2: 10 комментариев

  1. Не хватает репарсинга — при работе с прокси это критично.

    1. Если вы имели в виду повторное выполнение неудачно завершившихся запросов, то это есть в приведенном мной примере. Если вы внимательно читали его код, то должны были обратить внимание на то, что основной цикл добавления/выполнения запросов обернут во внешний цикл:
      for ($l = 0; $l < N_MAXLOOPS && count($this->aData); $l++) {...}

      Это означает, что после завершения основного цикла, он будет повторяться максимум N_MAXLOOPS-1 раз до тех пор, пока массив $this->aData не станет пустым. Этот массив содержит данные незавершенных запросов. При удачном завершении запроса он удаляется из массива $this->aData. Таким образом, каждый неудачно завершившийся запрос будет выполнен повторно до удачного завершения, либо пока число повторов не превысит N_MAXLOOPS-1.

  2. Приветствую! Подскажите пожалуйста на примере, как реализовать через данный класс такую задачу по цепочке:
    1. Авторизация;
    2. переход по ссылке;
    3. пост.
    при этом, у каждого потока должны быть свои прокси, куки и юзер-агенты.

    1. Да точно так же, как и в рассмотренном здесь примере. Отличие только в том, что у вас цепочка будет длиннее. А так, принцип тот же самый: на первом запросе получаете начальную страницу, потом делаете производный запрос (в вашем случае — запрос авторизации), используя при этом те же куки, прокси и юзерагент, что и в первом запросе, и т.д…
      Статья содержит все необходимое, чтобы ее можно было использовать как практическое пособие. Все остальное — в ваших руках.
      Если же вы ждете, что я напишу отдельную статью с решением специально для вашего случая, то это напрасно. Это уже повод для оплачиваемой работы.

  3. Здравствуйте! Есть два вопроса.

    1. Объясните, пожалуйста, смысл конструкции
    while (($code = curl_multi_exec($hcm, $flag)) == CURLM_CALL_MULTI_PERFORM) ;
    Это 146 строка в RollingCurlMini.php. Здесь же нет собственно цикла. Тем не менее, я знаю, что это работает, т.к. также присутствует и в классе RollingCurl.

    2. Вы использовали условие завершения цикла:

    do {

    } while (($flag || count($a_reqs_map)) && !$this->bBreak);

    Но переменная $this->bBreak — всегда false, т.к. true она приобретает лишь при вызове публичной ф-ции requestBreak(), которая в вашем классе нигде не вызывается. Эта ф-ция создана «на всякий случай» для вызова из объекта?

    1. Спасибо за вопросы.

      1. Цикл здесь есть (while), но объяснить его смысл я к сожалению не смогу. Если честно, я и сам не совсем понимаю, как именно он работает. Я просто скопировал этот код из оригинала (Rolling Curl). Кстати, этот же код используется и в мануале, но никаких объяснений там, естес-но, нет.

      2. Да, в самом классе переменная $this->bBreak не используется. Она предназначена для внешнего использования в том случае, если нужно прервать все незавершенные запросы. Конечно, для этого можно использовать исключения: обернуть код выполнения запросов в блок try.. catch и там где необходимо бросать исключение (throw). Но в таком случае память, выделенная под незавершенные запросы, не будет освобождена и эти запросы так и будут висеть до самого конца выполнения скрипта. Чем больше число потоков, тем больше лишней памяти будет расходовано. Поэтому я и ввел эту переменную — чтобы «правильно» завершать текущую очередь запросов. Для этого нужно вызвать метод requestBreak. Предполагается, что это будет делаться внутри колбека обработки ответа. Я обычно использую это для тестирования работы парсеров, если объем данных на сайте очень велик. Тогда я ставлю ограничение на число успешно обработанных страниц данных, напр-р, не больше 10. Когда это число достигается, я вызываю requestBreak и весь цикл парсинга завершается. При этом общее число спарсенных страниц может быть гораздо больше, т.к. не все страницы могут содержать нужные данные. К тому же, если данные на сайте разбиты по категориям, то, чтобы добраться до нужных данных, придется скачивать/парсить страницы категорий.

  4. Благодарю.

    Да, я уже понял, как можно использовать requestBreak(). Засунул его в ф-цию, отвечающую за смену прокси и удаление предыдущей нерабочей: если число прокси становится меньше числа потоков, вызывается прерывание и сообщение об ошибке.

    Кстати, у вас здесь, видимо, ошибка — наверно, имелся в виду return, а не continue:

    public function handleMsg($content, $url, $aInfo, $id) {

    if (!$msg) continue;

    if (!$url_xtra) continue;

    }

    Хочу еще спросить, верной ли будет такая логика. Мне нужно последовательно обрабатывать n-е к-ство урлов. Если я буду с каждого коллбека вызывать метод add с передачей следующего урла и соответствующего ему коллбека, работать должно правильно?

    И последний вопрос. Не совсем понимаю смысл использования вами проверки в коллбеках «if (!isset($this->aData[$id])) return;». А в каком случае такое может произойти? Ведь «unset($this->aData[$id]);» происходит только тогда, когда вся цепочка урлов успешно обработана.

    P.S. И да, большое спасибо за скрипт! Он очень хорошо подошел для моих целей.

    1. >> Кстати, у вас здесь, видимо, ошибка.
      Спасибо. Это в самом деле ошибка. Уже поправил.

      >> Если я буду с каждого коллбека вызывать метод add с передачей следующего урла и соответствующего ему коллбека, работать должно правильно?
      Да, конечно, это будет работать. Как я писал в 1-й части: «Rolling Curl позволяет добавить новые запросы в текущую очередь запросов непосредственно во время ее выполнения/обработки так, что они будут выполняться наравне с оставшимися запросами из исходного списка». Эта фишка перекочевала из оригинального Rolling Curl.

      >> Не совсем понимаю смысл использования вами проверки в коллбеках
      Это просто перестраховка, на всякий случай. Я привык перепроверять входные данные) (в данном случае аргумент функции).

  5. Спасибо за скрипт.
    Некоторые сайты очень хорошо обрабатывает.Использую список из 200 прокси. В принципе все рабочие.
    Но вот попался один сайт со множеством редиректов. Во-первых рабочие прокси практически перестали срабатывать, т.е.очень часто ответ 0. Но и при ответе 302 тоже не проходит.
    Всвязи с этим вопрос: каким образом вашим скриптом можно более корректно обрабатывать редиректы.

    1. Примерно так же, как и при обычных CURL-запросах: с помощью опций CURLOPT_FOLLOWLOCATION, CURLOPT_MAXREDIRS и CURLOPT_AUTOREFERER. Только их нужно не устан-ть вручную функцией curl_setopt, а нужно передать их массивом опций, либо в метод add — 5-м параметром (если вы хотите применить их только для одного запроса), либо в метод setOptions (тогда они будут использованы для всех запросов).

Комментарии запрещены.