shyim
6/17/2017 - 4:17 PM

MultiCurl Wrapper

MultiCurl Wrapper

<?php

/**
 * Class MultiCurl
 */
Class MultiCurl
{
    /**
     * Current curl version
     * @var float
     */
    private $_curl_version;

    /**
     * max. number of simultaneous connections allowed
     * @var int
     */
    private $_maxConcurrent = 0;

    /**
     * shared cURL options
     * @var array
     */
    private $_options = [];

    /**
     * shared cURL request headers
     * @var array
     */
    private $_headers = [];

    /**
     * default callback
     * @var null
     */
    private $_callback = null;

    /**
     * all requests must be completed by this time
     * @var int
     */
    private $_timeout = 20000;

    /**
     * request_queue
     * @var array
     */
    private $requests = [];

    /**
     * MultiCurl constructor.
     * @param int $max_concurrent
     */
    function __construct($max_concurrent = 10)
    {
        $this->setMaxConcurrent($max_concurrent);
        $this->_curl_version = curl_version()['version'];
    }

    public function setMaxConcurrent($max_requests)
    {
        if ($max_requests > 0) {
            $this->_maxConcurrent = $max_requests;
        }
    }

    public function setOptions(array $options)
    {
        $this->_options = $options;
    }

    public function setHeaders(array $headers)
    {
        if (is_array($headers) && count($headers)) {
            $this->_headers = $headers;
        }
    }

    public function setCallback(callable $callback)
    {
        $this->_callback = $callback;
    }

    public function setTimeout($timeout)
    {
        if ($timeout > 0) {
            $this->_timeout = $timeout;
        }
    }

    //Add a request to the request queue

    /**
     * Add a request to the request queue
     *
     * @param string $url
     * @param array|null $post_data
     * @param callable|null $callback
     * @param array|null $user_data
     * @param array|null $options
     * @param array|null $headers
     * @return int
     */
    public function addRequest(
        $url,
        $post_data = null,
        callable $callback = null,
        $user_data = null,
        array $options = null,
        array $headers = null
    ){
        $this->requests[] = [
            'url' => $url,
            'post_data' => ($post_data) ? $post_data : null,
            'callback' => ($callback) ? $callback : $this->_callback,
            'user_data' => ($user_data) ? $user_data : null,
            'options' => ($options) ? $options : null,
            'headers' => ($headers) ? $headers : null
        ];
        return count($this->requests) - 1;
    }

    /**
     * Reset request queue
     */
    public function reset()
    {
        $this->requests = [];
    }

    /**
     * Process all requests in queue
     */
    public function execute()
    {
        $requests_map = [];
        $multi_handle = curl_multi_init();
        $num_outstanding = 0;

        //start processing the initial request queue
        $num_initial_requests = min($this->_maxConcurrent, count($this->requests));
        for ($i = 0; $i < $num_initial_requests; $i++) {
            $this->initRequest($i, $multi_handle, $requests_map);
            $num_outstanding++;
        }
        do {
            do {
                $mh_status = curl_multi_exec($multi_handle, $active);
            } while ($mh_status == CURLM_CALL_MULTI_PERFORM);
            if ($mh_status != CURLM_OK) {
                break;
            }
            //a request is just completed, find out which one
            while ($completed = curl_multi_info_read($multi_handle)) {
                $this->processRequest($completed, $multi_handle, $requests_map);
                $num_outstanding--;
                //try to add/start a new requests to the request queue
                while (
                    $num_outstanding < $this->_maxConcurrent && //under the limit
                    $i < count($this->requests) && isset($this->requests[$i]) // requests left
                ) {
                    $this->initRequest($i, $multi_handle, $requests_map);
                    $num_outstanding++;
                    $i++;
                }
            }
            usleep(15); //save CPU cycles, prevent continuous checking
        } while ($active || count($requests_map));
        $this->reset();
        curl_multi_close($multi_handle);
    }

    /**
     * @param array $request
     * @return array|mixed
     */
    private function buildOptions(array $request)
    {
        $url = $request['url'];
        $post_data = $request['post_data'];
        $individual_opts = $request['options'];
        $individual_headers = $request['headers'];
        $options = ($individual_opts) ? $individual_opts + $this->_options : $this->_options;
        $headers = ($individual_headers) ? $individual_headers + $this->_headers : $this->_headers;
        $options[CURLOPT_RETURNTRANSFER] = true;
        $options[CURLOPT_NOSIGNAL] = 1;

        if (version_compare($this->_curl_version, '7.16.2') >= 0) {
            $options[CURLOPT_CONNECTTIMEOUT_MS] = $this->_timeout;
            $options[CURLOPT_TIMEOUT_MS] = $this->_timeout;
            unset($options[CURLOPT_CONNECTTIMEOUT]);
            unset($options[CURLOPT_TIMEOUT]);
        } else {
            $options[CURLOPT_CONNECTTIMEOUT] = round($this->_timeout / 1000);
            $options[CURLOPT_TIMEOUT] = round($this->_timeout / 1000);
            unset($options[CURLOPT_CONNECTTIMEOUT_MS]);
            unset($options[CURLOPT_TIMEOUT_MS]);
        }
        if ($url) {
            $options[CURLOPT_URL] = $url;
        }
        if ($headers) {
            $options[CURLOPT_HTTPHEADER] = $headers;
        }

        if ($post_data !== null) {
            $options[CURLOPT_POST] = 1;
            $options[CURLOPT_POSTFIELDS] = is_array($post_data) ? http_build_query($post_data) : $post_data;
        }

        return $options;
    }

    /**
     * @param $request_num
     * @param $multi_handle
     * @param $requests_map
     */
    private function initRequest($request_num, $multi_handle, &$requests_map)
    {
        $request =& $this->requests[$request_num];
        $this->addTimer($request);
        $ch = curl_init();
        $options = $this->buildOptions($request);
        $request['options_set'] = $options;
        curl_setopt_array($ch, $options);
        curl_multi_add_handle($multi_handle, $ch);
        $ch_hash = (string)$ch;
        $requests_map[$ch_hash] = $request_num;
    }

    /**
     * @param $completed
     * @param $multi_handle
     * @param array $requests_map
     */
    private function processRequest($completed, $multi_handle, array &$requests_map)
    {
        $ch = $completed['handle'];
        $ch_hash = (string)$ch;
        $request =& $this->requests[$requests_map[$ch_hash]];
        $request_info = curl_getinfo($ch);
        $request_info['curle'] = $completed['result'];
        $request_info['handle'] = $ch;
        $request_info['time'] = $time = $this->stopTimer($request);
        $request_info['url_raw'] = $url = $request['url'];
        $request_info['user_data'] = $user_data = $request['user_data'];

        if (curl_errno($ch) !== 0 || intval($request_info['http_code']) !== 200) {
            $response = false;
        } else {
            $response = curl_multi_getcontent($ch);
        }

        $callback = $request['callback'];
        $options = $request['options_set'];
        if ($response && !empty($options[CURLOPT_HEADER])) {
            $k = intval($request_info['header_size']);
            $request_info['response_header'] = substr($response, 0, $k);
            $response = substr($response, $k);
        }
        unset($requests_map[$ch_hash]);
        curl_multi_remove_handle($multi_handle, $ch);

        if ($callback) {
            call_user_func($callback, $response, $url, $request_info, $user_data, $time);
        }
        $request = null;
    }

    /**
     * @param array $request
     */
    private function addTimer(array &$request)
    {
        $request['timer'] = microtime(true);
        $request['time'] = false;
    }

    /**
     * @param array $request
     * @return mixed
     */
    private function stopTimer(array &$request)
    {
        $elapsed = $request['timer'] - microtime(true);
        $request['time'] = $elapsed;
        unset($request['timer']);
        return $elapsed;
    }
}