ebylund
3/17/2016 - 7:55 PM

AWS Kinesis Example for PHP (using the AWS SDK for PHP)

AWS Kinesis Example for PHP (using the AWS SDK for PHP)

<?php

// curl -sS https://getcomposer.org/installer | php
// php composer.phar require aws/aws-sdk-php

// export AWS_ACCESS_KEY_ID=...
// export AWS_SECRET_ACCESS_KEY=...

$streamName = '<INSERT_YOUR_STREAMNAME_HERE>';
$totalNumberOfRecords = 10000;

require_once 'vendor/autoload.php';
$sdk = new \Aws\Sdk();
$kinesisClient = $sdk->createKinesis(['region' => 'eu-west-1', 'version' => '2013-12-02']);


/**
 * Simple buffer that batches messages before passing them to a callback
 */
class Buffer {

    protected $callback;
    protected $size;
    protected $data = [];

    public function __construct($callback, $size=500) {
        $this->callback = $callback;
        $this->size = $size;
    }

    public function add($item) {
        $this->data[] = $item;
        if (count($this->data) >= $this->size) {
            $this->flush();
        }
    }

    public function reset() {
        $this->data = [];
    }

    public function flush() {
        if (count($this->data) > 0) {
            call_user_func($this->callback, $this->data);
            $this->reset();
        }
    }
}

$buffer = new Buffer(function(array $data) use ($kinesisClient, $streamName) {
    echo "Flushing\n";
    $parameter = [ 'StreamName' => $streamName, 'Records' => []];
    foreach ($data as $item) {
        $parameter['Records'][] = [
            'Data' => $item,
            'PartitionKey' => md5($item)
        ];
    }
    $res = $kinesisClient->putRecords($parameter);
    echo "Failed records: {$res->get('FailedRecordCount')}\n";
});

$startTime = microtime(true);
for ($i=0; $i<$totalNumberOfRecords; $i++) {
    $buffer->add(json_encode([
        'id' => rand(0, 10000),
        'title' => 'Foo'
    ]));
}
$buffer->flush();

$duration = microtime(true) - $startTime;
$timePerMessage = $duration*1000 / $totalNumberOfRecords;

echo "Total Duration: " . round($duration) . " seconds\n";
echo "Time per message: " . round($timePerMessage, 2) . " ms/message\n";
<?php

// curl -sS https://getcomposer.org/installer | php
// php composer.phar require aws/aws-sdk-php

// export AWS_ACCESS_KEY_ID=...
// export AWS_SECRET_ACCESS_KEY=...

$streamName = '<INSERT_YOUR_STREAMNAME_HERE>';
$numberOfRecordsPerBatch = 10000;

require_once 'vendor/autoload.php';
$sdk = new \Aws\Sdk();
$kinesisClient = $sdk->createKinesis(['region' => 'eu-west-1', 'version' => '2013-12-02']);

// get all shard ids
$res = $kinesisClient->describeStream([ 'StreamName' => $streamName ]);
$shardIds = $res->search('StreamDescription.Shards[].ShardId');

$count = 0;
$startTime = microtime(true);
foreach ($shardIds as $shardId) {
    echo "ShardId: $shardId\n";

    // get initial shard iterator
    $res = $kinesisClient->getShardIterator([
        'ShardId' => $shardId,
        'ShardIteratorType' => 'TRIM_HORIZON', // 'AT_SEQUENCE_NUMBER|AFTER_SEQUENCE_NUMBER|TRIM_HORIZON|LATEST'
        // 'StartingSequenceNumber' => '<string>',
        'StreamName' => $streamName,
    ]);
    $shardIterator = $res->get('ShardIterator');

    do {
        echo "Get Records\n";
        $res = $kinesisClient->getRecords([
            'Limit' => $numberOfRecordsPerBatch,
            'ShardIterator' => $shardIterator
        ]);
        $shardIterator = $res->get('NextShardIterator');
        $localCount = 0;
        foreach ($res->search('Records[].[SequenceNumber, Data]') as $data) {
            list($sequenceNumber, $item) = $data;
            echo "- [$sequenceNumber] $item\n";
            $count++;
            $localCount++;
        }
        echo "Processed $localCount records in this batch\n";
        sleep(1);
    } while ($localCount>0);

}

$duration = microtime(true) - $startTime;
$timePerMessage = $duration*1000 / $count;

echo "Total Duration: " . round($duration) . " seconds\n";
echo "Time per message: " . round($timePerMessage, 2) . " ms/message\n";