Это третья статья об эмуляции многопоточности в языке PHP. Во второй статье мы применили СURL библиотеку, а теперь будем использовать stream-функции для нашей задачи.
Перевод документации:
Streams (потоки) введены в PHP, начиная с версии 4.3.0 для обобщения работы с сетью, файлами, сжатием данных, а также иными процессами, которые пользуются одним набором функций. Простое определение потока – объект ресурса, обладающий "потокообразным" поведением, из которого можно читать, в который можно записывать и внутри которого возможно перемещаться.
Функции stream_socket являются частью streams и используя их, мы можем устанавливать соединения и писать/читать данные, не ждя выполнения предыдущей операции.
У нас есть массив с URL-адресами и наша задача - непоследовательно получить их содержимое. Изучим код, решающий поставленную задачу:
// URL-адреса, содержимое которых нужно получить
$pages = array('www.google.ru', 'www.yahoo.com', 'www.yandex.ru', 'www.rambler.ru');
$read_tasks = array(); // задачи чтения
$write_tasks = array(); // задачи записи
$results = array(); // результаты
foreach ($pages as $page) {
// открываем сокет для URL
$sh = stream_socket_client($page.':80', $errno, $errstr, 10,
STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);
if (!$sh) continue;
// добавляем в массив задач для записи
$write_tasks[$page] = $sh;
}
while ($write_tasks || $read_tasks) {
// массив для сокетов, имеющих возможность для чтения
$read_tasks_ = $read_tasks;
// массив для сокетов, имеющих возможность для записи
$write_tasks_ = $write_tasks;
// ждем ответ из сокетов
$n = stream_select($read_tasks_, $write_tasks_, $e=null, 10);
if ($n > 0) {
// доступные для записи сокеты
foreach ($write_tasks_ as $sh) {
// ищем URL страницы по дескриптору сокета в массиве задач записи
$page = array_search($sh, $write_tasks);
// удаляем из массива задач записи
unset($write_tasks[$page]);
// добавляем в массив задач чтения
$read_tasks[$page] = $sh;
// прописываем заголовки http
$headers = "GET / HTTP/1.0\r\n";
$headers .= "Host: ".$page."\r\n";
$headers .= "User-Agent: Mozilla/5.0 (Windows NT 5.1; rv:12.0) Gecko/20100101 Firefox/12.0\r\n";
$headers .= "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8\r\n";
$headers .= "Accept-Language: ru,en-us;q=0.7,en;q=0.3\r\n";
$headers .= "Accept-Encoding: gzip,deflate\r\n";
$headers .= "Accept-Charset: windows-1251,utf-8;q=0.7,*;q=0.7\r\n";
$headers .= "\r\n";
// записываем в сокет
if (fwrite($sh, $headers) === false) fclose($sh);
}
// сокеты, доступные для чтения
foreach ($read_tasks_ as $sh) {
// ищем URL страницы по дескриптору сокета в массиве задач чтения
$page = array_search($sh, $read_tasks);
if (!$page) continue;
// читаем результат из сокета
$result = '';
while ($r = fread($sh, 8192)) $result .= $r;
// закрываем сокет
fclose($sh);
// удаляем из массива задач чтения
unset($read_tasks[$page]);
// заносим полученное содержимое в массив результатов
$results[$page] = $result;
}
} else {
break;
}
}
В начале инициализируем массивы: $read_tasks – сюда заноситься сокеты, из которых можно читать данные, $write_tasks – массив сокетов, которые готовы к записи, и массив $results – для сохранения полученного контента с нужных нам URL-адресов.
Далее в цикле применяем функцию stream_socket_client открывая для каждого URL сокет. Флаг STREAM_CLIENT_ASYNC_CONNECT указывает, что следующее соединение надо открывать асинхронно (не ждя завершения открытия предыдущего), а флаг STREAM_CLIENT_CONNECT нужен для создания клиентского соединения. В массиве $write_tasks сохраняются дескрипторы сокетов.
Затем, в главном цикле массивам $read_tasks_ и $write_tasks_, которые передаются функции stream_select, присваиваются массивы, которые хранят сокеты для чтения и записи. stream_select функция следит за существованием данных в потоке, а также принимает следующие параметры:
- Массив дескрипторов, ожидающих завершение операции чтения;
- Массив дескрипторов, ожидающих завершения операции записи;
- Для ситуации поступления внеполосных данных - "out-of-band data";
- Таймаут.
Далее ищем, какие сокеты доступны для записи данных. Если нужно ещё что-то отправить перед окончательным ответом - удаляем текущий дескриптор из массива для записи и записываем его в массив для чтения. Затем, отправляем нужные http-заголовки для получения содержимого.
Далее, в foreach-цикле смотрим сокеты доступные для чтения, сохраняем содержимое в строку $result, удаляем дескриптор из массива для чтения и закрываем сокет. Содержимое строки $result сохраняем в массив $results. Выполняем эти операции, пока у нас есть открытые сокеты.
После исполнения скрипта полученный контент располагается в массиве $results.
Итак, мы рассмотрели решение нашей задачи с приминением функций stream_socket, в следующей статье рассмотрим асинхронные сокеты для эмуляции многопоточности в PHP.
Настоящая статья-ликбез, раскрывающая тему многопоточности в среде РНР с применением stream-функции.