Load and commit
This commit is contained in:
parent
f01bf5083a
commit
c5cc2f35a5
|
@ -5,8 +5,8 @@ import std/nre, std/os, std/strutils, std/tables, std/parseopt, std/streams, std
|
|||
import cbor
|
||||
import ./blobsets, ./blobsets/filestores
|
||||
|
||||
when defined(genode):
|
||||
import dagfsclient
|
||||
#when defined(genode):
|
||||
# import dagfsclient
|
||||
|
||||
#else:
|
||||
# import ./blobsets/tcp
|
||||
|
@ -43,7 +43,6 @@ proc insertPath(store: BlobStore; bs: BlobSet; kind: PathComponent; path: string
|
|||
except:
|
||||
let e = getCurrentException()
|
||||
writeLine(stderr, "failed to ingest '", path, "', ", e.msg)
|
||||
# raise e
|
||||
|
||||
proc ingestMain() =
|
||||
var args = newSeq[string]()
|
||||
|
@ -252,7 +251,11 @@ proc print(a: Atom; s: Stream) =
|
|||
s.write '|'
|
||||
s.write $a.size
|
||||
of atomSet:
|
||||
s.write "«set»"
|
||||
case a.bs.kind
|
||||
of coldNode:
|
||||
s.write $a.bs.setId
|
||||
else:
|
||||
s.write "«set»"
|
||||
of atomString:
|
||||
s.write '"'
|
||||
s.write a.str
|
||||
|
@ -390,7 +393,8 @@ proc cborFunc(env: Env; arg: NodeObj): NodeRef =
|
|||
|
||||
proc commitFunc(env: Env; arg: NodeObj): NodeRef =
|
||||
assertArgCount(arg, 1)
|
||||
raiseAssert("not implemented")
|
||||
let cold = commit(env.store, arg.atom.bs)
|
||||
cold.newAtom.newNode
|
||||
|
||||
#[
|
||||
proc copyFunc(env: Env; args: NodeObj): NodeRef =
|
||||
|
@ -474,6 +478,11 @@ proc listFunc(env: Env; args: NodeObj): NodeRef =
|
|||
while not result.tailRef.nextRef.isNil:
|
||||
result.tailRef = result.tailRef.nextRef
|
||||
|
||||
proc loadFunc(env: Env; args: NodeObj): NodeRef =
|
||||
assertArgCount(args, 1)
|
||||
let bs = env.store.loadSet args.atom.bs.setId
|
||||
bs.newAtom.newNode
|
||||
|
||||
proc mapFunc(env: Env; args: NodeObj): NodeRef =
|
||||
assertArgCount(args, 2)
|
||||
result = newNodeList()
|
||||
|
@ -551,6 +560,7 @@ proc newEnv(store: BlobStore): Env =
|
|||
result.bindEnv "ingest", ingestFunc
|
||||
result.bindEnv "insert", insertFunc
|
||||
result.bindEnv "key", keyFunc
|
||||
result.bindEnv "load", loadFunc
|
||||
result.bindEnv "list", listFunc
|
||||
result.bindEnv "map", mapFunc
|
||||
#result.bindEnv "merge", mergeFunc
|
||||
|
@ -638,7 +648,7 @@ proc replMain() =
|
|||
scripted = true
|
||||
let
|
||||
#store = openStore()
|
||||
store = newNullStore() # newFileStore("/tmp/blobs")
|
||||
store = newFileStore("/tmp/blobs")
|
||||
env = newEnv(store)
|
||||
outStream = stdout.newFileStream
|
||||
readLine = if scripted: readLineSimple else: readLineFromStdin
|
||||
|
|
279
src/blobsets.nim
279
src/blobsets.nim
|
@ -36,8 +36,7 @@ func `$`*(bh: BlobId): string =
|
|||
let r = (Rune)baseRune or b.int
|
||||
fastToUTF8Copy(r, result, pos, true)
|
||||
|
||||
func toBlobId*(s: string): BlobId =
|
||||
## Parse a visual blob hash to binary.
|
||||
func parseStringId[T](s: string): T =
|
||||
if s.len == blobVisualLen:
|
||||
var
|
||||
pos: int
|
||||
|
@ -46,6 +45,28 @@ func toBlobId*(s: string): BlobId =
|
|||
fastRuneAt(s, pos, r, true)
|
||||
b = r.byte
|
||||
|
||||
func parseCborId[T](c: CborNode): T =
|
||||
## Parse a CBOR node to binary.
|
||||
if c.bytes.len == result.data.len:
|
||||
for i in 0..result.data.high:
|
||||
result.data[i] = c.bytes[i]
|
||||
|
||||
func toBlobId*(s: string): BlobId =
|
||||
## Parse a visual blob hash to binary.
|
||||
parseStringId[BlobId] s
|
||||
|
||||
func toBlobId(c: CborNode): BlobId =
|
||||
## Parse a CBOR blob hash to binary.
|
||||
parseCborId[BlobId] c
|
||||
|
||||
func toSetId*(s: string): SetId =
|
||||
## Parse a visual set hash to binary.
|
||||
parseStringId[SetId] s
|
||||
|
||||
func toSetId(c: CborNode): SetId =
|
||||
## Parse a CBOR set hash to binary.
|
||||
parseCborId[SetId] c
|
||||
|
||||
proc `==`*(x, y: BlobId): bool = x.data == y.data
|
||||
## Compare two BlobIds.
|
||||
|
||||
|
@ -182,14 +203,19 @@ func toCbor(k: Key): CborNode =
|
|||
## Keys are endian independent.
|
||||
newCborBytes cast[array[sizeof(k), byte]](k)
|
||||
|
||||
const
|
||||
# CBOR tags
|
||||
nodeTag = 0
|
||||
leafTag = 1
|
||||
|
||||
type
|
||||
setKind* = enum hotNode, coldNode, leafNode
|
||||
SetKind* = enum hotNode, coldNode, leafNode
|
||||
BlobSet* = ref BlobSetObj
|
||||
BlobSetObj = object
|
||||
case kind*: setKind
|
||||
case kind*: SetKind
|
||||
of hotNode:
|
||||
bitmap: Key
|
||||
table*: seq[BlobSet]
|
||||
table: seq[BlobSet]
|
||||
of coldNode:
|
||||
setId*: SetId
|
||||
of leafNode:
|
||||
|
@ -351,33 +377,51 @@ func remove*(trie: BlobSet; name: string): BlobSet =
|
|||
result = newBlobSet()
|
||||
|
||||
func toCbor*(x: BlobSet): CborNode =
|
||||
const
|
||||
nodeTag = 0
|
||||
leafTag = 1
|
||||
let array = newCborArray()
|
||||
case x.kind
|
||||
of hotNode:
|
||||
var
|
||||
map = x.bitmap
|
||||
buf = newCborBytes(sizeof(Key))
|
||||
when not sizeof(Key) == 8:
|
||||
{.error: "unknown key conversion".}
|
||||
bigEndian64(buf.bytes[0].addr, map.addr)
|
||||
array.add buf
|
||||
let array = newCborArray()
|
||||
array.add x.bitmap
|
||||
for y in x.table:
|
||||
array.add y.toCbor
|
||||
newCborTag(nodeTag, array)
|
||||
of coldNode:
|
||||
array.add x.setId.data
|
||||
newCborTag(nodeTag, array)
|
||||
newCborTag(nodeTag, x.setId.data.newCborBytes)
|
||||
of leafNode:
|
||||
array.add x.key.toCbor
|
||||
let array = newCborArray()
|
||||
array.add x.key
|
||||
array.add x.blob.data
|
||||
array.add x.size
|
||||
newCborTag(leafTag, array)
|
||||
|
||||
func leafCount*(size: Natural): int = (size+blobLeafSize-1) div blobLeafSize
|
||||
|
||||
func compressTree*(leaves: var seq[BlobId]) =
|
||||
var
|
||||
ctx: Blake2b256
|
||||
nodeOffset = 0
|
||||
nodeDepth = 0
|
||||
while leaves.len > 1:
|
||||
nodeOffset = 0
|
||||
inc nodeDepth
|
||||
var pos, next: int
|
||||
while pos < leaves.len:
|
||||
ctx.init do (params: var Blake2bParams):
|
||||
params.fanout = 2
|
||||
params.depth = 255
|
||||
params.leafLength = blobLeafSize
|
||||
params.nodeOffset = nodeOffset
|
||||
params.nodeDepth = nodeDepth
|
||||
inc nodeOffset
|
||||
ctx.update(leaves[pos].data)
|
||||
inc pos
|
||||
if pos < leaves.len:
|
||||
ctx.update(leaves[pos].data)
|
||||
inc pos
|
||||
leaves[next] = ctx.finish()
|
||||
inc next
|
||||
leaves.setLen(next)
|
||||
# TODO: BLAKE2 tree finalization flags
|
||||
|
||||
type
|
||||
BlobKind* = enum
|
||||
dataBlob, metaBlob
|
||||
|
@ -421,98 +465,8 @@ type
|
|||
openBlobStreamImpl*: proc (s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream {.nimcall, gcsafe.}
|
||||
openIngestStreamImpl*: proc (s: BlobStore; size: BiggestInt; kind: BlobKind): IngestStream {.nimcall, gcsafe.}
|
||||
|
||||
proc close*(s: BlobStore) =
|
||||
## Close active store resources.
|
||||
if not s.closeImpl.isNil: s.closeImpl(s)
|
||||
|
||||
proc openBlobStream*(s: BlobStore; id: BlobId; size = 0.BiggestInt; kind = dataBlob): BlobStream =
|
||||
## Return a new `BlobStream` for reading a blob.
|
||||
assert(not s.openBlobStreamImpl.isNil)
|
||||
s.openBlobStreamImpl(s, id, size, kind)
|
||||
|
||||
proc openIngestStream*(s: BlobStore; size = 0.BiggestInt; kind = dataBlob): IngestStream =
|
||||
## Return a new `IngestStream` for ingesting a blob.
|
||||
assert(not s.openIngestStreamImpl.isNil)
|
||||
s.openIngestStreamImpl(s, size, kind)
|
||||
|
||||
func compressTree*(leaves: var seq[BlobId]) =
|
||||
var
|
||||
ctx: Blake2b256
|
||||
nodeOffset = 0
|
||||
nodeDepth = 0
|
||||
while leaves.len > 1:
|
||||
nodeOffset = 0
|
||||
inc nodeDepth
|
||||
var pos, next: int
|
||||
while pos < leaves.len:
|
||||
ctx.init do (params: var Blake2bParams):
|
||||
params.fanout = 2
|
||||
params.depth = 255
|
||||
params.leafLength = blobLeafSize
|
||||
params.nodeOffset = nodeOffset
|
||||
params.nodeDepth = nodeDepth
|
||||
inc nodeOffset
|
||||
ctx.update(leaves[pos].data)
|
||||
inc pos
|
||||
if pos < leaves.len:
|
||||
ctx.update(leaves[pos].data)
|
||||
inc pos
|
||||
leaves[next] = ctx.finish()
|
||||
inc next
|
||||
leaves.setLen(next)
|
||||
# TODO: BLAKE2 tree finalization flags
|
||||
|
||||
iterator dumpBlob*(store: BlobStore; id: BlobId): string =
|
||||
var
|
||||
stream = store.openBlobStream(id, kind=dataBlob)
|
||||
buf = newString(blobLeafSize)
|
||||
defer:
|
||||
close stream
|
||||
while true:
|
||||
buf.setLen(blobLeafSize)
|
||||
let n = stream.read(buf[0].addr, buf.len)
|
||||
if n == 0:
|
||||
break
|
||||
buf.setLen(n)
|
||||
yield buf
|
||||
|
||||
proc commit*(store: BlobStore; bs: BlobSet): BlobSet =
|
||||
assert(bs.kind == hotNode)
|
||||
for e in bs.table.mitems:
|
||||
case e.kind
|
||||
of coldNode, leafNode: discard
|
||||
of hotNode:
|
||||
e = store.commit e
|
||||
let stream = store.openIngestStream(kind=metaBlob)
|
||||
var buf = encode bs.toCbor
|
||||
stream.ingest(buf)
|
||||
let (id, _) = finish stream
|
||||
result = BlobSet(kind: coldNode, setId: id)
|
||||
|
||||
proc apply*(store: BlobStore; bs: BlobSet; name: string; f: proc (id: BlobId; size: BiggestInt)) =
|
||||
# TODO: lazy-load set
|
||||
bs.apply(name, f)
|
||||
|
||||
proc insert*(store: BlobStore; bs: BlobSet; name: string; blob: BlobId; size: BiggestInt): BlobSet =
|
||||
# TODO: lazy-load set
|
||||
insert(bs, name, blob, size)
|
||||
|
||||
proc remove*(store: BlobStore; bs: BlobSet; name: string): BlobSet =
|
||||
# TODO: lazy-load set
|
||||
remove(bs, name)
|
||||
|
||||
proc union*(store: BlobStore; sets: varargs[BlobSet]): BlobSet =
|
||||
## Return the union of `sets`.
|
||||
# TODO: lazy-load set
|
||||
var fresh = newBlobSet()
|
||||
proc freshInsert(leaf: BlobSet) =
|
||||
fresh = insert(fresh, leaf)
|
||||
for bs in sets:
|
||||
assert(not bs.isnil)
|
||||
bs.apply(freshInsert)
|
||||
result = fresh
|
||||
|
||||
# Store implementations
|
||||
#
|
||||
# Null Store implementation
|
||||
#
|
||||
|
||||
type
|
||||
|
@ -567,3 +521,106 @@ proc newNullStore*(): BlobStore =
|
|||
BlobStore(
|
||||
openBlobStreamImpl: nullOpenBlobStream,
|
||||
openIngestStreamImpl: nullOpenIngestStream)
|
||||
|
||||
proc close*(s: BlobStore) =
|
||||
## Close active store resources.
|
||||
if not s.closeImpl.isNil: s.closeImpl(s)
|
||||
|
||||
proc openBlobStream*(s: BlobStore; id: BlobId; size = 0.BiggestInt; kind = dataBlob): BlobStream =
|
||||
## Return a new `BlobStream` for reading a blob.
|
||||
assert(not s.openBlobStreamImpl.isNil)
|
||||
s.openBlobStreamImpl(s, id, size, kind)
|
||||
|
||||
proc openIngestStream*(s: BlobStore; size = 0.BiggestInt; kind = dataBlob): IngestStream =
|
||||
## Return a new `IngestStream` for ingesting a blob.
|
||||
assert(not s.openIngestStreamImpl.isNil)
|
||||
s.openIngestStreamImpl(s, size, kind)
|
||||
|
||||
iterator dumpBlob*(store: BlobStore; id: BlobId): string =
|
||||
var
|
||||
stream = store.openBlobStream(id, kind=dataBlob)
|
||||
buf = newString(blobLeafSize)
|
||||
defer:
|
||||
close stream
|
||||
while true:
|
||||
buf.setLen(blobLeafSize)
|
||||
let n = stream.read(buf[0].addr, buf.len)
|
||||
if n == 0:
|
||||
break
|
||||
buf.setLen(n)
|
||||
yield buf
|
||||
|
||||
proc loadSet(store: BlobStore; id: SetId; depth: int): BlobSet =
|
||||
if Key.high shr depth == 0:
|
||||
raiseAssert("loadSet trie is too deep")
|
||||
var
|
||||
stream = store.openBlobStream(id, kind=metaBlob)
|
||||
buf = newString(blobLeafSize)
|
||||
defer:
|
||||
close stream
|
||||
let n = stream.read(buf[0].addr, buf.len)
|
||||
buf.setLen(n)
|
||||
let
|
||||
c = buf.parseCbor.val
|
||||
bitmap = c.seq[0].getInt
|
||||
if bitmap.countSetBits != c.seq.len-1:
|
||||
let bits = bitmap.countSetBits
|
||||
raise newException(ValueError, "invalid set CBOR, bitmap is " & $bits & " and sequence len is " & $c.seq.len)
|
||||
result = BlobSet(
|
||||
kind: hotNode,
|
||||
bitmap: bitmap,
|
||||
table: newSeqOfCap[BlobSet](c.seq.len-1))
|
||||
for i in 1..c.seq.high:
|
||||
let node = c[i].val
|
||||
case c[i].tag.int
|
||||
of nodeTag:
|
||||
result.table.add loadSet(store, node.toSetId, depth+1)
|
||||
of leafTag:
|
||||
let
|
||||
leaf = BlobSet(
|
||||
kind: leafNode,
|
||||
key: getInt node[0],
|
||||
blob: parseCborId[BlobId] node[1],
|
||||
size: getInt node[2])
|
||||
result.table.add leaf
|
||||
else:
|
||||
raise newException(ValueError, "invalid set CBOR")
|
||||
|
||||
proc loadSet*(store: BlobStore; id: SetId): BlobSet =
|
||||
loadSet store, id, 0
|
||||
|
||||
proc commit*(store: BlobStore; bs: BlobSet): BlobSet =
|
||||
assert(bs.kind == hotNode)
|
||||
for e in bs.table.mitems:
|
||||
case e.kind
|
||||
of coldNode, leafNode: discard
|
||||
of hotNode:
|
||||
e = store.commit e
|
||||
let stream = store.openIngestStream(kind=metaBlob)
|
||||
var buf = encode bs.toCbor
|
||||
stream.ingest(buf)
|
||||
let (id, _) = finish stream
|
||||
result = BlobSet(kind: coldNode, setId: id)
|
||||
|
||||
proc apply*(store: BlobStore; bs: BlobSet; name: string; f: proc (id: BlobId; size: BiggestInt)) =
|
||||
# TODO: lazy-load set
|
||||
bs.apply(name, f)
|
||||
|
||||
proc insert*(store: BlobStore; bs: BlobSet; name: string; blob: BlobId; size: BiggestInt): BlobSet =
|
||||
# TODO: lazy-load set
|
||||
insert(bs, name, blob, size)
|
||||
|
||||
proc remove*(store: BlobStore; bs: BlobSet; name: string): BlobSet =
|
||||
# TODO: lazy-load set
|
||||
remove(bs, name)
|
||||
|
||||
proc union*(store: BlobStore; sets: varargs[BlobSet]): BlobSet =
|
||||
## Return the union of `sets`.
|
||||
# TODO: lazy-load set
|
||||
var fresh = newBlobSet()
|
||||
proc freshInsert(leaf: BlobSet) =
|
||||
fresh = insert(fresh, leaf)
|
||||
for bs in sets:
|
||||
assert(not bs.isnil)
|
||||
bs.apply(freshInsert)
|
||||
result = fresh
|
||||
|
|
|
@ -51,7 +51,11 @@ proc fsOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind
|
|||
var fs = FileStore(s)
|
||||
let stream = FsBlobStream()
|
||||
result = stream
|
||||
stream.path = fs.root / $id
|
||||
case kind
|
||||
of dataBlob:
|
||||
stream.path = fs.root / "data" / $id
|
||||
of metaBlob:
|
||||
stream.path = fs.root / "blob" / $id
|
||||
stream.file = openAsync(stream.path, fmRead)
|
||||
stream.closeImpl = fsBlobClose
|
||||
stream.readImpl = fsBlobRead
|
||||
|
@ -108,8 +112,8 @@ proc fsOpenIngestStream(s: BlobStore; size: BiggestInt; kind: BlobKind): IngestS
|
|||
stream.leaves = newSeq[BlobId]()
|
||||
|
||||
proc newFileStore*(root = "/"): FileStore =
|
||||
if not existsDir(root):
|
||||
createDir root
|
||||
createDir(root / "data")
|
||||
createDir(root / "blob")
|
||||
new result
|
||||
result.openBlobStreamImpl = fsOpenBlobStream
|
||||
result.openIngestStreamImpl = fsOpenIngestStream
|
||||
|
|
Loading…
Reference in New Issue