Это третья статья об эмуляции многопоточности в языке PHP. Во второй статье мы применили СURL библиотеку, а теперь будем использовать stream-функции для нашей задачи.

Перевод документации:

Streams (потоки) введены в PHP, начиная с версии 4.3.0 для обобщения работы с сетью, файлами, сжатием данных, а также иными процессами, которые пользуются одним набором функций. Простое определение потока – объект ресурса, обладающий "потокообразным" поведением, из которого можно читать, в который можно записывать и внутри которого возможно перемещаться.

Функции stream_socket являются частью streams и используя их, мы можем устанавливать соединения и писать/читать данные, не ждя выполнения предыдущей операции.

У нас есть массив с URL-адресами и наша задача - непоследовательно получить их содержимое. Изучим код, решающий поставленную задачу:


// URL-адреса, содержимое которых нужно получить
$pages = array('www.google.ru', 'www.yahoo.com', 'www.yandex.ru', 'www.rambler.ru');

$read_tasks = array(); // задачи чтения
$write_tasks = array(); // задачи записи
$results = array(); // результаты

foreach ($pages as $page) {

	// открываем сокет для URL
	$sh = stream_socket_client($page.':80', $errno, $errstr, 10,
		STREAM_CLIENT_ASYNC_CONNECT|STREAM_CLIENT_CONNECT);
	if (!$sh) continue;

	// добавляем в массив задач для записи
	$write_tasks[$page] = $sh;
}

while ($write_tasks || $read_tasks) {

	// массив для сокетов, имеющих возможность для чтения
	$read_tasks_ = $read_tasks;

	// массив для сокетов, имеющих возможность для записи
	$write_tasks_ = $write_tasks;

	// ждем ответ из сокетов
	$n = stream_select($read_tasks_, $write_tasks_, $e=null, 10);

	if ($n > 0) {

		// доступные для записи сокеты
		foreach ($write_tasks_ as $sh) {

			// ищем URL страницы по дескриптору сокета в массиве задач записи
			$page = array_search($sh, $write_tasks);

			// удаляем из массива задач записи
			unset($write_tasks[$page]);

			// добавляем в массив задач чтения
			$read_tasks[$page] = $sh;

			// прописываем заголовки http
			$headers  = "GET / HTTP/1.0\r\n";
			$headers .= "Host: ".$page."\r\n";
			$headers .= "User-Agent: Mozilla/5.0 (Windows NT 5.1; rv:12.0) Gecko/20100101 Firefox/12.0\r\n";
			$headers .= "Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8\r\n";
			$headers .= "Accept-Language: ru,en-us;q=0.7,en;q=0.3\r\n";
			$headers .= "Accept-Encoding: gzip,deflate\r\n";
			$headers .= "Accept-Charset: windows-1251,utf-8;q=0.7,*;q=0.7\r\n";
			$headers .= "\r\n";

			// записываем в сокет
			if (fwrite($sh, $headers) === false) fclose($sh);

		}

		// сокеты, доступные для чтения
		foreach ($read_tasks_ as $sh) {

			// ищем URL страницы по дескриптору сокета в массиве задач чтения
			$page = array_search($sh, $read_tasks);
			if (!$page) continue;

			// читаем результат из сокета
			$result = '';
			while ($r = fread($sh, 8192)) $result .= $r;

			// закрываем сокет
			fclose($sh);

			// удаляем из массива задач чтения
			unset($read_tasks[$page]);

			// заносим полученное содержимое в массив результатов
			$results[$page] = $result;

		}

	} else {
		break;
	}

}

В начале инициализируем массивы: $read_tasks – сюда заноситься сокеты, из которых можно читать данные, $write_tasks – массив сокетов, которые готовы к записи, и массив $results – для сохранения полученного контента с нужных нам URL-адресов.

Далее в цикле применяем функцию stream_socket_client открывая для каждого URL сокет. Флаг STREAM_CLIENT_ASYNC_CONNECT указывает, что следующее соединение надо открывать асинхронно (не ждя завершения открытия предыдущего), а флаг STREAM_CLIENT_CONNECT нужен для создания клиентского соединения. В массиве $write_tasks сохраняются дескрипторы сокетов.

Затем, в главном цикле массивам $read_tasks_ и $write_tasks_, которые передаются функции stream_select, присваиваются массивы, которые хранят сокеты для чтения и записи. stream_select функция следит за существованием данных в потоке, а также принимает следующие параметры:

  1. Массив дескрипторов, ожидающих завершение операции чтения;
  2. Массив дескрипторов, ожидающих завершения операции записи;
  3. Для ситуации поступления внеполосных данных - "out-of-band data";
  4. Таймаут.
Массивы по указателям отправляются в функцию, и после возвращения функцией числа > 0, в них будут сохранены сокеты, готовые к дальнейшим манипуляциям.

Далее ищем, какие сокеты доступны для записи данных. Если нужно ещё что-то отправить перед окончательным ответом - удаляем текущий дескриптор из массива для записи и записываем его в массив для чтения. Затем, отправляем нужные http-заголовки для получения содержимого.

Далее, в foreach-цикле смотрим сокеты доступные для чтения, сохраняем содержимое в строку $result, удаляем дескриптор из массива для чтения и закрываем сокет. Содержимое строки $result сохраняем в массив $results. Выполняем эти операции, пока у нас есть открытые сокеты.

После исполнения скрипта полученный контент располагается в массиве $results.

Итак, мы рассмотрели решение нашей задачи с приминением функций stream_socket, в следующей статье рассмотрим асинхронные сокеты для эмуляции многопоточности в PHP.

Комментарии

captcha
 6 авг. 2012 Ответить

Настоящая статья-ликбез, раскрывающая тему многопоточности в среде РНР с применением stream-функции.