lshifr
5/14/2012 - 7:55 PM

Large data framework

Large data framework

ClearAll[$fileNameFunction,fileName, $importFunction,import, $exportFunction, 
  export,  $compressFunction, $uncompressFunction]

$fileNameFunction = fileName;
$importFunction  = import;
$exportFunction = export;
$compressFunction = Compress;
$uncompressFunction = Uncompress;


fileName[dir_, hash_] := 
   FileNameJoin[{dir, StringJoin["data", ToString[hash], ".dat"]}];
mxFileName[dir_, hash_] := 
   FileNameJoin[{dir, StringJoin["data", ToString[hash], ".mx"]}];
import =  
   Function[fname, Import[fname, "String"]];
export = 
   Function[{fname, compressedValue}, 
      Export[fname, compressedValue, "String"]];
mxImport = 
   Function[fname, Block[{data}, Get[fname]; data]];
mxExport = 
   Function[{fname, compressedValue}, 
       Block[{data = compressedValue}, DumpSave[fname, data]]];


ClearAll[definePartAPI];
definePartAPI[s_Symbol, part_Integer, dir_String] :=
 LetL[{sym = Unique[], hash = Hash[sym], 
     fname = $fileNameFunction[dir, hash]
   },
   sym := sym =  $uncompressFunction@$importFunction[fname];
   s /: HoldPattern[Part[s, part]] := sym;

   (* Release memory and renew for next reuse *)
   s /: releasePart[s, part] :=
       Replace[Hold[$uncompressFunction@$importFunction[fname]], 
          Hold[def_] :> (ClearAll[sym]; sym := sym = def)];

   (* Check if on disk *)
   s /: savedOnDisk[s, part] := FileExistsQ[fname];

   (* remove from disk *)
   s /: removePartOnDisk[s, part] := DeleteFile[fname];

   (* save new on disk *)
   s /: savePartOnDisk[s, part, value_] :=
      $exportFunction[fname, $compressFunction @value];

   (* Set a given part to a new value *)
   If[! TrueQ[setPartDefined[s]],
     s /: setPart[s, pt_, value_] :=
       Module[{},
         savePartOnDisk[s, pt, value];
         releasePart[s, pt];
         value
       ];
     s /: setPartDefined[s] = True;
   ];
(* Release the API for this part. Irreversible *)
s /: releaseAPI[s, part] := Remove[sym];
];


ClearAll[LetL];
SetAttributes[LetL, HoldAll];
LetL /: Verbatim[SetDelayed][lhs_, rhs : HoldPattern[LetL[{__}, _]]] :=  
  Block[{With},
    Attributes[With] = {HoldAll};
    lhs := Evaluate[rhs]];
LetL[{}, expr_] := expr;
LetL[{head_}, expr_] := With[{head}, expr];
LetL[{head_, tail__}, expr_] :=
  Block[{With}, Attributes[With] = {HoldAll};
   With[{head}, Evaluate[LetL[{tail}, expr]]]];


ClearAll[appendTo];
Options[appendTo] = {
   ElementSizeLimit :> $elementSizeLimit,
   DestinationDirectory :> $destinationDirectory
 };
appendTo[s_Symbol, value_, opts : OptionsPattern[]] :=
  LetL[{len = Length[s], part = len + 1,
     dir = OptionValue[DestinationDirectory],
     blim = OptionValue[ElementSizeLimit]
    },
    definePartAPI[s, part, dir];
    s /: Length[s] = part;
    If[ByteCount[value] > blim,
       definePartAPI[s, part, dir];
       savePartOnDisk[s, part, value];
       releasePart[s, part],
       (* else *)
       With[{compressed = $compressFunction @value}, 
         s /: Part[s, part] := 
            (s /: Part[s, part] = $uncompressFunction@compressed);
         s /: Part[s, part, parts___] := Part[s, part][[parts]];
  ]]];


ClearAll[initList];
initList[s_Symbol] :=
  Module[{},
   ClearAll[s];
   (* Set a new value for part, including update on disk *)
   s /: Length[s] = 0;
   s /: HoldPattern[Take[s, {n_}]] := s[[n]];
   s /: HoldPattern[Take[s, n_]] := Take[s, {1, n}];
   s /: HoldPattern[Take[s, {m_, n_}]] := Table[s[[i]], {i, m, n}];
   s /: HoldPattern[Drop[s, {n_}]] := Drop[s, {n, n}];
   s /: HoldPattern[Drop[s, n_]] := 
      Table[s[[i]], {i, n + 1, Length[s]}];
   s /: HoldPattern[Drop[s, {m_, n_}]] :=
        Table[s[[i]], {i, Range[m - 1] ~~ Join ~~ Range[n + 1, Length[s]]}];
   s /: Map[f_, s] := Table[f[s[[i]]], {i, Length[s]}];
   s /: HoldPattern[First[s]] := s[[1]];
   s /: HoldPattern[Last[s]] := s[[Length[s]]];
   s /: HoldPattern[Rest[s]] := Drop[s, 1];
   s /: HoldPattern[Most[s]] := Take[s, {1, Length[s] - 1}];
   s /: Position[s, patt_] :=
      If[# === {}, {}, First@#] &@
        Reap[Do[If[MatchQ[s[[i]], patt], Sow[{i}]], {i, Length[s]}]][[2]]
  ];


ClearAll[releasePart, savedOnDisk, removePartOnDisk, removePartOnDisk,
   savePartOnDisk, releaseAPI]
$destinationDirectory = $TemporaryDirectory ;
$elementSizeLimit = 50000;


ClearAll[appendList];
appendList[s_Symbol, l_List, opts : OptionsPattern[]] :=
   Do[appendTo[s, l[[i]], opts], {i, 1, Length[l]}];

ClearAll[removeStorage];
removeStorage[s_Symbol] :=
   Do[If[savedOnDisk[s, i], removePartOnDisk[s, i]], {i, Length[s]}];

ClearAll[releaseAllMemory];
releaseAllMemory[s_Symbol] :=
   Do[releasePart[s, i], {i, Length[s]}];


(* Our current system only has one-step dependencies*)
ClearAll[getDependencies];
getDependencies[s_Symbol] :=
 Thread[
   Prepend[
     Union@Cases[UpValues[s],
     sym_Symbol /; Context[sym] =!= "System`" :> HoldComplete[sym],
     {0, Infinity}, Heads -> True],
   HoldComplete[s]
  ],
  HoldComplete] 


ClearAll[getMainListFileName];
Options[getMainListFileName] = {
   DestinationDirectory :> $destinationDirectory,
   ListFileName -> Automatic
 };
getMainListFileName[s_Symbol, opts : OptionsPattern[]] :=
  LetL[{fn = OptionValue[ListFileName],
    fname = If[fn === Automatic, ToString[s] <> ".m", fn],
    fullfname = FileNameJoin[{OptionValue[ DestinationDirectory], fname}]},
   fullfname];


ClearAll[storeMainList];
storeMainList[s_Symbol, opts : OptionsPattern[]] :=
  LetL[{filteredOpts  = 
      Sequence @@ FilterRules[{opts}, Options[getMainListFileName]],
      fname  = getMainListFileName[s, filteredOpts]},
    releaseAllMemory[s];
    If[FileExistsQ[fname], DeleteFile[fname]];
    Replace[getDependencies[s],
       HoldComplete[syms_] :> Save[fname , Unevaluated[syms]]]];


ClearAll[retrieveMainList];
retrieveMainList[s_Symbol, opts : OptionsPattern[]] :=
  LetL[{filteredOpts  = 
      Sequence @@ FilterRules[{opts}, Options[getMainListFileName]],
      fname  = getMainListFileName[s, filteredOpts],
      imported =  Import[fname , "HeldExpressions"]
     },
    ReleaseHold[imported /.
       {TagSet -> TagSetDelayed, UpSet -> UpSetDelayed}
       ] /; imported =!= $Failed;
    ];

 retrieveMainList[___] := $Failed;


ClearAll[deleteListComplete];
deleteListComplete[s_Symbol, opts : OptionsPattern[]] :=
 LetL[{filteredOpts  = 
    Sequence @@ FilterRules[{opts}, Options[getMainListFileName]],
    fname  = getMainListFileName[s, filteredOpts]},
    removeStorage[s];
    If[FileExistsQ[fname], DeleteFile[fname]];
    Do[releaseAPI[s, i], {i, Length[s]}];
    ClearAll[s]];