Missmiaom
4/25/2020 - 1:42 AM

example

#pragma once

#include "disruptor/sequencer.h"
#include "disruptor/ring_buffer.h"

//Ring buffer size 必须是2的N次方
// disruptor::kDefaultRingBufferSize;
constexpr size_t RING_SIZE = 8;

using ClaimStrategy = disruptor::MultiThreadedStrategy<RING_SIZE>;
using WaitStrategy = disruptor::SleepingStrategy<>;
using NumberSequencer = disruptor::Sequencer<int, RING_SIZE, ClaimStrategy, WaitStrategy>;
#pragma once
#include <iostream>
#include <thread>
#include <windows.h>
#include "disruptor/sequencer.h"
#include "disruptor/ring_buffer.h"
#include "util.h"

class Producer
{
public:
	// 传入 Sequencer 指针,进行操作
  Producer(std::string name, NumberSequencer* sequencer, int data)
      : _name(name)
      , _sequencer(sequencer)
      , _stop(false)
      , _data(data)
  {
    _thread = new std::thread(&Producer::Start, this);
  }

  ~Producer()
  {
    _stop = true;
    _thread->join();
  }

  void Start()
  {
    int onceSize = 1;

    while (!_stop)
    {		
      // 1.调用Claim方法,申请写入空间,返回申请到空间的最大的序列号
      int seq = _sequencer->Claim(onceSize);

    	// 2.写入数据
      for (int i = seq - onceSize + 1; i <= seq; ++i)
      {
          (*_sequencer)[i] = _data;
          std::cout << std::endl << "Producer " + _name + " write: " + std::to_string(_data);
      }

    	// 3.调用Publish方法,确认已经写入数据的序列号
      _sequencer->Publish(seq, onceSize);
      std::cout << std::endl << "Producer " + _name + " write to : " << seq;

      Sleep(1000);
    }
  }

private:
  NumberSequencer                             *_sequencer;
  std::thread                                 *_thread;
  bool                                        _stop;
  int                                         _data;
  std::string                                 _name;
};
#pragma once
#include <iostream>
#include <thread>
#include <string>
#include <windows.h>
#include "disruptor/sequencer.h"
#include "disruptor/ring_buffer.h"
#include "util.h"

class Consumer
{
public:
  Consumer(std::string name, NumberSequencer* sequencer, disruptor::Sequence* handle)
      : _name(name)
      , _stop(false)
      , _handled(handle)								            //需要维护的进度序列号
      , _sequencer(sequencer)							          //传入 Sequencer 指针进行操作
      , _seqWant(disruptor::kFirstSequenceValue)		//期望读取的序列号
  {
    // 1.创建 SequenceBarrier 对象
    _barrier = _sequencer->NewBarrier(std::vector<disruptor::Sequence*>());
    _thread = new std::thread(&Consumer::Start, this);
  }

  ~Consumer()
  {
    delete _barrier;
    _stop = true;
    _thread->join();
  }

  void Start()
  {
    while (!_stop)
    {
      int64_t seqGeted;
    	// 2.调用 WaitFor 方法获取数据 
      if ((seqGeted = _barrier->WaitFor(_seqWant)) >= 0)
      {
        for (int i = _seqWant; i <= seqGeted; ++i)
        {
            std::cout << std::endl << "Consumer " + _name + " get: " << i;
            Sleep(2000);
        }
      	//更新期望序列号
        _seqWant = seqGeted + 1;
      	//维护当前进度序列号
        _handled->set_sequence(seqGeted);
      }
    }
  }

private:
  NumberSequencer                             *_sequencer;
  disruptor::SequenceBarrier<WaitStrategy>    *_barrier;
  disruptor::Sequence*                        _handled;
  int64_t                                     _seqWant;
  std::thread                                 *_thread;
  bool                                        _stop;
  std::string                                 _name;
};
#include "util.h"
#include "Consumer.hpp"
#include "Producer.hpp"

#include <array>
#include <memory>

int main()
{
  std::array<int, RING_SIZE> numberArray = { 0 };
  NumberSequencer *sequencer = new NumberSequencer(numberArray);

	//为两个读线程创建两个进度序列号,初值为-1
  disruptor::Sequence* handle_c1 = new disruptor::Sequence(disruptor::kInitialCursorValue);
  disruptor::Sequence* handle_c2 = new disruptor::Sequence(disruptor::kInitialCursorValue);

	//将 sequencer 的门限序列设置为保存有写线程进度序列号的序列
  std::vector<disruptor::Sequence*> sequences;
  sequences.push_back(handle_c1);
	sequences.push_back(handle_c2);
  sequencer->set_gating_sequences(sequences);

	//开启两个写线程
  std::shared_ptr<Producer> p1(new Producer("1", sequencer, 1));
  std::shared_ptr<Producer> p2(new Producer("2", sequencer, 2));

	//开启两个读线程,分别传入进度序列号
  std::shared_ptr<Consumer> c1(new Consumer("1", sequencer, handle_c1));
  std::shared_ptr<Consumer> c2(new Consumer("2", sequencer, handle_c2));

  getchar();
}