A wrapper around Riak time series client so you can work with documents and not lists of columns, rows and cells. The abstract repo just requires you to define the table name, columns and a map method.
public abstract class RiakTSRepository<TTSEntry> where TTSEntry : ITimeSeriesEntry
{
protected readonly IRiakClient Client;
protected Store.Builder Builder;
protected RiakTSRepository(IRiakClient client)
{
Client = client;
Builder = new Store.Builder()
.WithTable(TableName)
.WithColumns(Columns.ToList());
}
protected abstract string TableName { get; }
protected abstract IEnumerable<Column> Columns { get; }
protected abstract IEnumerable<Cell> Map(TTSEntry item);
protected abstract TTSEntry Map(IEnumerable<Cell> cells);
public RiakResult Insert(TTSEntry item)
{
var cells = Map(item);
var cmd = Builder
.WithRow(new Row(cells))
.Build();
return Client.Execute(cmd);
}
public RiakResult Insert(IEnumerable<TTSEntry> items)
{
var cellsSet = items.Select(Map);
var rows = cellsSet
.Select(x => new Row(x))
.ToArray();
var cmd = Builder
.WithRows(rows)
.Build();
return Client.Execute(cmd);
}
public RiakTSQueryResult<TTSEntry> Query(string format, params object[] parameters)
{
var query = string.Format(format, parameters);
return Query(query);
}
public RiakTSQueryResult<TTSEntry> Query(string query)
{
var cmd = new Query.Builder()
.WithTable(TableName)
.WithQuery(query)
.Build();
var result = Client.Execute(cmd);
if (!result.IsSuccess) return new RiakTSQueryResult<TTSEntry>(result);
var items = cmd.Response
.Value
.Select(x => Map(x.Cells))
.ToList();
return new RiakTSQueryResult<TTSEntry>(result, items);
}
}
public class RiakTSQueryResult<TTSEntry>
{
private readonly RiakResult _result;
public RiakTSQueryResult(RiakResult result) : this(result, null)
{
}
public RiakTSQueryResult(RiakResult result, IEnumerable<TTSEntry> queryResult)
{
QueryResult = queryResult;
_result = result;
ErrorMessage = result.ErrorMessage;
IsSuccess = result.IsSuccess;
ResultCode = result.ResultCode;
Exception = result.Exception;
}
public bool IsSuccess { get; }
public ResultCode ResultCode { get; }
public Exception Exception { get; }
public string ErrorMessage { get; }
public IEnumerable<TTSEntry> QueryResult { get; }
}
public interface ITimeSeriesEntry
{
DateTime TimeStamp { get; set; }
}
public class TransactionRepository : RiakTSRepository<TestTransaction>
{
protected override string TableName => "Transaction";
protected override IEnumerable<Column> Columns
{
get
{
var columns = new[]
{
new Column("category", ColumnType.Varchar),
new Column("subcategory", ColumnType.Varchar),
new Column("time", ColumnType.Timestamp),
new Column("id", ColumnType.Varchar),
new Column("value", ColumnType.Double),
new Column("somedata", ColumnType.Varchar)
};
return columns;
}
}
public TransactionRepository(IRiakClient client) : base(client)
{
}
public RiakResult CreateTable()
{
var command = new Query(new QueryOptions(TableName)
{
Query = @"CREATE TABLE Transaction
(
bedeplayerid varchar NOT NULL,
category VARCHAR NOT NULL,
subcategory VARCHAR NOT NULL,
time TIMESTAMP NOT NULL,
id VARCHAR NOT NULL,
value DOUBLE,
somedata VARCHAR NOT NULL,
PRIMARY KEY (
(bedeplayerid, QUANTUM(time, 365, 'd')),
bedeplayerid, time, id
)
)"
});
return Client.Execute(command);
}
public RiakTSQueryResult<TestTransaction> GetTransactions(string bedePlayerID, DateTime at)
{
var from = DateTimeUtil.ToUnixTimeMillis(at) -1;
var to = DateTimeUtil.ToUnixTimeMillis(at) + 1;
var query = $"SELECT * FROM {TableName} WHERE bedeplayerid = '{bedePlayerID}' and time > {from} and time < {to}";
return Query(query);
}
public RiakTSQueryResult<TestTransaction> GetTransactions(string bedePlayerID, DateTime from, DateTime to)
{
var query = $"SELECT * FROM {TableName} WHERE bedeplayerid = '{bedePlayerID}' and time > {DateTimeUtil.ToUnixTimeMillis(from)} and time < {DateTimeUtil.ToUnixTimeMillis(to)}";
return Query(query);
}
protected override IEnumerable<Cell> Map(TestTransaction item)
{
var cells = new Cell[]
{
new Cell<string>(item.BedePlayerId),
new Cell<string>(item.Category),
new Cell<string>(item.SubCateogry),
new Cell<DateTime>(item.TimeStamp),
new Cell<string>(item.Id),
new Cell<double>(item.Value),
new Cell<string>(item.SomeData)
};
return cells;
}
protected override TestTransaction Map(IEnumerable<Cell> cells)
{
var array = cells.ToArray();
var map = new TestTransaction
{
BedePlayerId = array[0].AsObject.ToString(),
Category = array[1].AsObject.ToString(),
SubCateogry = array[2].AsObject.ToString(),
TimeStamp = (DateTime)array[3].AsObject,
Id = array[4].AsObject.ToString(),
Value = double.Parse(array[5].AsObject.ToString()),
SomeData = array[6].AsObject.ToString()
};
return map;
}
}
public class TestTransaction : ITimeSeriesEntry
{
public string BedePlayerId { get; set; }
public string Category { get; set; }
public string SubCateogry { get; set; }
public string Id { get; set; }
public double Value { get; set; }
public DateTime TimeStamp { get; set; }
public string SomeData { get; set; }
}