#pragma once
#include "disruptor/sequencer.h"
#include "disruptor/ring_buffer.h"
//Ring buffer size 必须是2的N次方
// disruptor::kDefaultRingBufferSize;
constexpr size_t RING_SIZE = 8;
using ClaimStrategy = disruptor::MultiThreadedStrategy<RING_SIZE>;
using WaitStrategy = disruptor::SleepingStrategy<>;
using NumberSequencer = disruptor::Sequencer<int, RING_SIZE, ClaimStrategy, WaitStrategy>;
#pragma once
#include <iostream>
#include <thread>
#include <windows.h>
#include "disruptor/sequencer.h"
#include "disruptor/ring_buffer.h"
#include "util.h"
class Producer
{
public:
// 传入 Sequencer 指针,进行操作
Producer(std::string name, NumberSequencer* sequencer, int data)
: _name(name)
, _sequencer(sequencer)
, _stop(false)
, _data(data)
{
_thread = new std::thread(&Producer::Start, this);
}
~Producer()
{
_stop = true;
_thread->join();
}
void Start()
{
int onceSize = 1;
while (!_stop)
{
// 1.调用Claim方法,申请写入空间,返回申请到空间的最大的序列号
int seq = _sequencer->Claim(onceSize);
// 2.写入数据
for (int i = seq - onceSize + 1; i <= seq; ++i)
{
(*_sequencer)[i] = _data;
std::cout << std::endl << "Producer " + _name + " write: " + std::to_string(_data);
}
// 3.调用Publish方法,确认已经写入数据的序列号
_sequencer->Publish(seq, onceSize);
std::cout << std::endl << "Producer " + _name + " write to : " << seq;
Sleep(1000);
}
}
private:
NumberSequencer *_sequencer;
std::thread *_thread;
bool _stop;
int _data;
std::string _name;
};
#pragma once
#include <iostream>
#include <thread>
#include <string>
#include <windows.h>
#include "disruptor/sequencer.h"
#include "disruptor/ring_buffer.h"
#include "util.h"
class Consumer
{
public:
Consumer(std::string name, NumberSequencer* sequencer, disruptor::Sequence* handle)
: _name(name)
, _stop(false)
, _handled(handle) //需要维护的进度序列号
, _sequencer(sequencer) //传入 Sequencer 指针进行操作
, _seqWant(disruptor::kFirstSequenceValue) //期望读取的序列号
{
// 1.创建 SequenceBarrier 对象
_barrier = _sequencer->NewBarrier(std::vector<disruptor::Sequence*>());
_thread = new std::thread(&Consumer::Start, this);
}
~Consumer()
{
delete _barrier;
_stop = true;
_thread->join();
}
void Start()
{
while (!_stop)
{
int64_t seqGeted;
// 2.调用 WaitFor 方法获取数据
if ((seqGeted = _barrier->WaitFor(_seqWant)) >= 0)
{
for (int i = _seqWant; i <= seqGeted; ++i)
{
std::cout << std::endl << "Consumer " + _name + " get: " << i;
Sleep(2000);
}
//更新期望序列号
_seqWant = seqGeted + 1;
//维护当前进度序列号
_handled->set_sequence(seqGeted);
}
}
}
private:
NumberSequencer *_sequencer;
disruptor::SequenceBarrier<WaitStrategy> *_barrier;
disruptor::Sequence* _handled;
int64_t _seqWant;
std::thread *_thread;
bool _stop;
std::string _name;
};
#include "util.h"
#include "Consumer.hpp"
#include "Producer.hpp"
#include <array>
#include <memory>
int main()
{
std::array<int, RING_SIZE> numberArray = { 0 };
NumberSequencer *sequencer = new NumberSequencer(numberArray);
//为两个读线程创建两个进度序列号,初值为-1
disruptor::Sequence* handle_c1 = new disruptor::Sequence(disruptor::kInitialCursorValue);
disruptor::Sequence* handle_c2 = new disruptor::Sequence(disruptor::kInitialCursorValue);
//将 sequencer 的门限序列设置为保存有写线程进度序列号的序列
std::vector<disruptor::Sequence*> sequences;
sequences.push_back(handle_c1);
sequences.push_back(handle_c2);
sequencer->set_gating_sequences(sequences);
//开启两个写线程
std::shared_ptr<Producer> p1(new Producer("1", sequencer, 1));
std::shared_ptr<Producer> p2(new Producer("2", sequencer, 2));
//开启两个读线程,分别传入进度序列号
std::shared_ptr<Consumer> c1(new Consumer("1", sequencer, handle_c1));
std::shared_ptr<Consumer> c2(new Consumer("2", sequencer, handle_c2));
getchar();
}