More futures

This commit is contained in:
Ehmry - 2019-03-18 22:50:00 +01:00
parent 6113019f15
commit 49ca99b557
7 changed files with 84 additions and 74 deletions

View File

@ -302,7 +302,7 @@ componentConstructHook = proc(env: GenodeEnv) =
proc createSession(env: GenodeEnv; store: BlobStore; id: ServerId; label: string; setId: SetId; txBufSize: int) = proc createSession(env: GenodeEnv; store: BlobStore; id: ServerId; label: string; setId: SetId; txBufSize: int) =
let let
fsSet = store.load setId fsSet = waitFor store.load setId
session = env.newSession(store, label, setId, fsSet, txBufSize) session = env.newSession(store, label, setId, fsSet, txBufSize)
cap = env.ep.manage session cap = env.ep.manage session
sessions[id] = session sessions[id] = session
@ -373,7 +373,7 @@ componentConstructHook = proc(env: GenodeEnv) =
idStr = pAttrs["root"] idStr = pAttrs["root"]
policySetId = toSetId idStr policySetId = toSetId idStr
if session.fsSetId != policySetId: if session.fsSetId != policySetId:
let newSet = store.load policySetId let newSet = waitFor store.load policySetId
session.fsSet = newSet session.fsSet = newSet
session.fsSetId = policySetId session.fsSetId = policySetId
echo idStr, " is new root of ", session.label echo idStr, " is new root of ", session.label

View File

@ -1,3 +1,4 @@
import std/asyncdispatch
import std/xmltree, std/streams, std/strtabs, std/strutils, std/xmlparser, std/tables import std/xmltree, std/streams, std/strtabs, std/strutils, std/xmlparser, std/tables
import genode, genode/parents, genode/servers, genode/roms import genode, genode/parents, genode/servers, genode/roms
import blobsets, blobsets/filestores import blobsets, blobsets/filestores
@ -58,7 +59,7 @@ componentConstructHook = proc(env: GenodeEnv) =
proc readDataspace(romSet: SetId; label: string): DataspaceCapability = proc readDataspace(romSet: SetId; label: string): DataspaceCapability =
let let
name = label.lastLabelElement name = label.lastLabelElement
bs = store.load romSet bs = waitFor store.load romSet
var cap: DataspaceCapability var cap: DataspaceCapability
store.apply(bs, name) do (id: BlobId; size: BiggestInt): store.apply(bs, name) do (id: BlobId; size: BiggestInt):
let let

View File

@ -62,7 +62,7 @@ proc insertPath(store: BlobStore; bs: BlobSet; kind: PathComponent; path: string
let (id, size) = waitFor store.ingestFile(path) let (id, size) = waitFor store.ingestFile(path)
path.removePrefix(getCurrentDir()) path.removePrefix(getCurrentDir())
path.removePrefix("/") path.removePrefix("/")
result = insert(store, result, path, id, size) result = waitfor insert(store, result, path, id, size)
writeLine(stdout, id, align($size, 11), " ", path) writeLine(stdout, id, align($size, 11), " ", path)
of pcDir, pcLinkToDir: of pcDir, pcLinkToDir:
for kind, subPath in path.walkDir: for kind, subPath in path.walkDir:
@ -101,7 +101,7 @@ proc checkMain() {.async.} =
let store = openStore() let store = openStore()
for i in 1..args.high: for i in 1..args.high:
try: try:
var bs = store.load(args[i].toBlobId) var bs = await store.load(args[i].toBlobId)
let stream = newMemberStream() let stream = newMemberStream()
asyncCheck stream.streamMembers(store, bs) asyncCheck stream.streamMembers(store, bs)
var m: tuple[key: Key; id: BlobId; size: BiggestInt] var m: tuple[key: Key; id: BlobId; size: BiggestInt]
@ -110,7 +110,7 @@ proc checkMain() {.async.} =
if not valid: break if not valid: break
if m.id.isShortHash: if m.id.isShortHash:
echo m.key, " has a short hash - ", m.id echo m.key, " has a short hash - ", m.id
bs = remove(store, bs, m.key) bs = await remove(store, bs, m.key)
echo "removed ", m.key echo "removed ", m.key
#close(store.openBlobStream(id, size, dataBlob)) #close(store.openBlobStream(id, size, dataBlob))
echo "commit repaired set" echo "commit repaired set"
@ -136,7 +136,7 @@ proc replicateMain() {.async.} =
try: try:
let let
srcId = args[i].toBlobId srcId = args[i].toBlobId
srcSet = src.load(srcId) srcSet = await src.load(srcId)
var dstSet = newBlobSet() var dstSet = newBlobSet()
let stream = newMemberStream() let stream = newMemberStream()
asyncCheck stream.streamMembers(src, srcSet) asyncCheck stream.streamMembers(src, srcSet)
@ -159,7 +159,7 @@ proc replicateMain() {.async.} =
m.id, ":", m.size, " replicated to ", m.id, ":", m.size, " replicated to ",
otherId, ":", otherSize) otherId, ":", otherSize)
quit -1 quit -1
dstSet = insert(dst, dstSet, m.key, m.id, m.size) dstSet = await insert(dst, dstSet, m.key, m.id, m.size)
let let
newSet = await commit(dst, dstSet) newSet = await commit(dst, dstSet)
dstId = newSet.setId dstId = newSet.setId
@ -599,7 +599,7 @@ proc insertFunc(env: Env; args: NodeObj): NodeRef =
trie = args.atom.bs trie = args.atom.bs
blob = args.next.atom blob = args.next.atom
name = args.next.next.atom.str name = args.next.next.atom.str
let newBs = env.store.insert(trie, name, blob.blob, blob.size) let newBs = waitFor env.store.insert(trie, name, blob.blob, blob.size)
doAssert(not newBs.isNil) doAssert(not newBs.isNil)
newNode(newAtom(newBs)) newNode(newAtom(newBs))
@ -619,7 +619,7 @@ proc listFunc(env: Env; args: NodeObj): NodeRef =
proc loadFunc(env: Env; args: NodeObj): NodeRef = proc loadFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 1) assertArgCount(args, 1)
let bs = env.store.load(args.atom.bs.setId) let bs = waitFor env.store.load(args.atom.bs.setId)
bs.newAtom.newNode bs.newAtom.newNode
proc mapFunc(env: Env; args: NodeObj): NodeRef = proc mapFunc(env: Env; args: NodeObj): NodeRef =
@ -660,7 +660,7 @@ proc removeFunc(env: Env; args: NodeObj): NodeRef =
let let
bs = args.atom.bs bs = args.atom.bs
name = args.next.atom.str name = args.next.atom.str
newNode(newAtom(env.store.remove(bs, name))) newNode(newAtom(waitFor env.store.remove(bs, name)))
proc searchFunc(env: Env; args: NodeObj): NodeRef = proc searchFunc(env: Env; args: NodeObj): NodeRef =
assertArgCount(args, 2) assertArgCount(args, 2)

View File

@ -351,10 +351,10 @@ proc loadSet(store: BlobStore; id: SetId; depth: int): Future[BlobSet] {.async.}
else: else:
raise newException(ValueError, "invalid set CBOR") raise newException(ValueError, "invalid set CBOR")
proc load*(store: BlobStore; id: SetId): BlobSet = proc load*(store: BlobStore; id: SetId): Future[BlobSet] =
waitFor loadSet(store, id, 0) loadSet(store, id, 0)
template load*(store: BlobStore; node: BlobSet): BlobSet = proc load*(store: BlobStore; node: BlobSet): Future[BlobSet] =
load(store, node.setId) load(store, node.setId)
proc randomApply*(store: BlobStore; trie: BlobSet; rng: var Rand; proc randomApply*(store: BlobStore; trie: BlobSet; rng: var Rand;
@ -371,7 +371,7 @@ proc randomApply*(store: BlobStore; trie: BlobSet; rng: var Rand;
f(next.blob, next.size) f(next.blob, next.size)
break break
of coldNode: of coldNode:
trie.table[i] = store.load(next) trie.table[i] = waitFor store.load(next)
of hotNode: of hotNode:
trie = next trie = next
i = rng.rand(countSetBits(trie.bitmap)-1) i = rng.rand(countSetBits(trie.bitmap)-1)
@ -389,7 +389,7 @@ proc streamMembers*(stream: FutureStream[tuple[key: Key; id: BlobId; size: Bigge
level = 0 level = 0
rng = initRand(rand(high int)) rng = initRand(rand(high int))
if trie.isCold: if trie.isCold:
path[0].trie = store.load(trie) path[0].trie = await store.load(trie)
else: else:
path[0].trie = trie path[0].trie = trie
path[0].mask = not(0'u64) shr (64 - path[0].trie.table.len) path[0].mask = not(0'u64) shr (64 - path[0].trie.table.len)
@ -411,7 +411,7 @@ proc streamMembers*(stream: FutureStream[tuple[key: Key; id: BlobId; size: Bigge
await stream.write(val) await stream.write(val)
else: else:
if node.isCold: if node.isCold:
node = store.load(node) node = await store.load(node)
inc level inc level
path[level].mask = not (not(0'u64) shl node.table.len) path[level].mask = not (not(0'u64) shl node.table.len)
path[level].trie = node path[level].trie = node
@ -434,7 +434,8 @@ func leafCount(bs: BlobSet): int =
else: else:
result.inc n.leafCount result.inc n.leafCount
proc search*(store: BlobStore; trie: BlobSet; name: string): BlobId = #[
proc search*(store: BlobStore; trie: BlobSet; name: string): Future[BlobId] {.async.} =
let key = name.toKey let key = name.toKey
var var
n = trie n = trie
@ -443,7 +444,7 @@ proc search*(store: BlobStore; trie: BlobSet; name: string): BlobId =
while k != Key(0) and n.masked(k): while k != Key(0) and n.masked(k):
let i = n.compactIndex(k) let i = n.compactIndex(k)
if n.table[i].isCold: if n.table[i].isCold:
n.table[i] = store.load(n.table[i]) n.table[i] = await store.load(n.table[i])
n = n.table[i] n = n.table[i]
if n.kind == leafNode: if n.kind == leafNode:
if n.key == key: if n.key == key:
@ -452,6 +453,7 @@ proc search*(store: BlobStore; trie: BlobSet; name: string): BlobId =
k = k shr keyChunkBits k = k shr keyChunkBits
inc level inc level
raise newException(KeyError, "key not in blob set") raise newException(KeyError, "key not in blob set")
]#
func apply(bs: BlobSet; cb: proc (leaf: BlobSet)) = func apply(bs: BlobSet; cb: proc (leaf: BlobSet)) =
## Apply a callback to each set element. ## Apply a callback to each set element.
@ -475,7 +477,7 @@ proc apply*(store: BlobStore; trie: BlobSet; name: string; f: proc (id: BlobId;
while k != Key(0) and n.masked(k): while k != Key(0) and n.masked(k):
let i = n.compactIndex(k) let i = n.compactIndex(k)
if n.table[i].isCold: if n.table[i].isCold:
n.table[i] = store.load(n.table[i]) n.table[i] = waitFor store.load(n.table[i])
n = n.table[i] n = n.table[i]
if n.kind == leafNode: if n.kind == leafNode:
if n.key == key: if n.key == key:
@ -489,79 +491,80 @@ proc contains*(store: BlobStore; bs: BlobSet; name: string): bool =
found = true found = true
found found
proc insert(store: BlobStore; trie, l: BlobSet; depth: int): BlobSet = proc insert(store: BlobStore; trie, l: BlobSet; depth: int): Future[BlobSet] {.async.} =
## This procedure is recursive to a depth of keyBits/keyChunkBits. ## This procedure is recursive to a depth of keyBits/keyChunkBits.
doAssert(depth < (keyBits div keyChunkBits), "key space exhausted during insert") doAssert(depth < (keyBits div keyChunkBits), "key space exhausted during insert")
result = BlobSet(kind: hotNode, bitmap: trie.bitmap, table: trie.table) var bs = BlobSet(kind: hotNode, bitmap: trie.bitmap, table: trie.table)
let key = l.key shr (depth * keyChunkBits) let key = l.key shr (depth * keyChunkBits)
if result.masked(key): if bs.masked(key):
let let
depth = depth + 1 depth = depth + 1
i = result.compactIndex(key) i = bs.compactIndex(key)
if result.table[i].isCold: if bs.table[i].isCold:
result.table[i] = store.load(result.table[i]) bs.table[i] = await store.load(bs.table[i])
case result.table[i].kind case bs.table[i].kind
of hotNode: of hotNode:
result.table[i] = insert(store, result.table[i], l, depth) bs.table[i] = await insert(store, bs.table[i], l, depth)
of leafNode: of leafNode:
if result.table[i].key == l.key: if bs.table[i].key == l.key:
result.table[i] = l bs.table[i] = l
else: else:
var subtrie = newBlobSet() var subtrie = newBlobSet()
subtrie = insert(store, subtrie, result.table[i], depth) subtrie = await insert(store, subtrie, bs.table[i], depth)
subtrie = insert(store, subtrie, l, depth) subtrie = await insert(store, subtrie, l, depth)
result.table[i] = subtrie bs.table[i] = subtrie
of coldNode: of coldNode:
discard discard
else: else:
result.bitmap = result.bitmap or key.mask bs.bitmap = bs.bitmap or key.mask
result.table.insert(l, result.compactIndex(key)) bs.table.insert(l, bs.compactIndex(key))
return bs
proc insert*(store: BlobStore; trie, node: BlobSet): BlobSet = proc insert*(store: BlobStore; trie, node: BlobSet): Future[BlobSet] =
## Insert set node `node` into `trie`. ## Insert set node `node` into `trie`.
insert(store, trie, node, 0) insert(store, trie, node, 0)
proc insert*(store: BlobStore; t: BlobSet; key: Key; blob: BlobId; size: BiggestInt): BlobSet = proc insert*(store: BlobStore; t: BlobSet; key: Key; blob: BlobId; size: BiggestInt): Future[BlobSet] =
## Insert a blob hash into a trie. ## Insert a blob hash into a trie.
let leaf = BlobSet(kind: leafNode, key: key, blob: blob, size: size) let leaf = BlobSet(kind: leafNode, key: key, blob: blob, size: size)
insert(store, t, leaf) insert(store, t, leaf)
proc insert*(store: BlobStore; t: BlobSet; name: string; blob: BlobId; size: BiggestInt): BlobSet = proc insert*(store: BlobStore; t: BlobSet; name: string; blob: BlobId; size: BiggestInt): Future[BlobSet] =
insert(store, t, name.toKey, blob, size) insert(store, t, name.toKey, blob, size)
proc remove(store: BlobStore; trie: BlobSet; fullKey: Key; depth: int): BlobSet = proc remove(store: BlobStore; trie: BlobSet; fullKey: Key; depth: int): Future[BlobSet] {.async.} =
result = trie var res = trie
let key = fullKey shr (depth * keyChunkBits) let key = fullKey shr (depth * keyChunkBits)
if result.masked(key): if res.masked(key):
let let
depth = depth + 1 depth = depth + 1
i = result.compactIndex(key) i = res.compactIndex(key)
if result.table[i].isCold: if res.table[i].isCold:
result.table[i] = store.load(result.table[i]) res.table[i] = await store.load(res.table[i])
trie.table[i] = result.table[i] trie.table[i] = res.table[i]
case result.table[i].kind case res.table[i].kind
of hotNode: of hotNode:
result.table[i] = remove(store, result.table[i], fullKey, depth) res.table[i] = await remove(store, res.table[i], fullKey, depth)
of leafNode: of leafNode:
if result.table.len == 2: if res.table.len == 2:
result.table.delete(i) res.table.delete(i)
result = result.table[0] res = res.table[0]
else: else:
result.table.delete(i) res.table.delete(i)
result.bitmap = result.bitmap xor key.mask res.bitmap = res.bitmap xor key.mask
of coldNode: of coldNode:
discard # previously handled discard # previously handled
return res
proc remove*(store: BlobStore; trie: BlobSet; key: Key): BlobSet = proc remove*(store: BlobStore; trie: BlobSet; key: Key): Future[BlobSet] =
## Remove a blob from a trie. ## Remove a blob from a trie.
if trie.isEmpty: if trie.isEmpty:
result = trie result = newFuture[BlobSet]()
result.complete trie
else: else:
result = remove(store, trie, key, 0) result = remove(store, trie, key, 0)
if result.isNil:
result = newBlobSet()
proc remove*(store: BlobStore; trie: BlobSet; name: string): BlobSet = proc remove*(store: BlobStore; trie: BlobSet; name: string): Future[BlobSet] =
remove(store, trie, name.toKey) remove(store, trie, name.toKey)
proc union*(store: BlobStore; sets: varargs[BlobSet]): BlobSet = proc union*(store: BlobStore; sets: varargs[BlobSet]): BlobSet =
@ -569,7 +572,7 @@ proc union*(store: BlobStore; sets: varargs[BlobSet]): BlobSet =
# TODO: lazy-load set # TODO: lazy-load set
var fresh = newBlobSet() var fresh = newBlobSet()
proc freshInsert(leaf: BlobSet) = proc freshInsert(leaf: BlobSet) =
fresh = insert(store, fresh, leaf) fresh = waitFor insert(store, fresh, leaf)
for bs in sets: for bs in sets:
assert(not bs.isnil) assert(not bs.isnil)
bs.apply(freshInsert) bs.apply(freshInsert)

View File

@ -31,7 +31,7 @@ proc insertPath(store: BlobStore; bs: BlobSet; kind: PathComponent; path: string
let (id, size) = waitFor store.ingestFile(path) let (id, size) = waitFor store.ingestFile(path)
path.removePrefix(getCurrentDir()) path.removePrefix(getCurrentDir())
path.removePrefix("/") path.removePrefix("/")
result = insert(store, result, path, id, size) result = waitFor insert(store, result, path, id, size)
writeLine(stdout, id, align($size, 11), " ", path) writeLine(stdout, id, align($size, 11), " ", path)
of pcDir, pcLinkToDir: of pcDir, pcLinkToDir:
for kind, subPath in path.walkDir: for kind, subPath in path.walkDir:

View File

@ -43,17 +43,20 @@ suite "store":
name = $i name = $i
blob = waitFor client.ingest(newString(i)) blob = waitFor client.ingest(newString(i))
echo "insert ", blob, " ", i echo "insert ", blob, " ", i
bs = insert(client, bs, name, blob, i) bs = waitFor insert(client, bs, name, blob, i)
setId = (waitFor commit(client, bs)).setId setId = (waitFor commit(client, bs)).setId
test "load": test "load":
bs = load(client, setId) bs = waitFor load(client, setId)
for i in 1..count: for i in 1..count:
let let
name = $i name = $i
blob = blobHash newString(i) blob = blobHash newString(i)
other = search(store, bs, name) var found = false
#doAssert(other == blob) apply(client, bs, name) do (id: BlobId; size: BiggestInt):
doAssert(id == blob)
found = true
doAssert(found)
for i in 1..count: for i in 1..count:
let let
i = i and 0x8000 i = i and 0x8000

View File

@ -1,4 +1,4 @@
import std/unittest, std/os, std/parseopt import std/asyncdispatch, std/unittest, std/os, std/parseopt
import ../src/blobsets import ../src/blobsets
@ -21,15 +21,18 @@ suite "Blob set tests":
blob = randomCid blob = randomCid
str = $randomCid str = $randomCid
doAssert(str.toBlobid == randomCid) doAssert(str.toBlobid == randomCid)
result = insert(store, s, path, blob, 0) result = waitFor insert(store, s, path, blob, 0)
let found = search(store, result, path) var found = false
doAssert(found == randomCid) apply(store, result, path) do (id: BlobId; size: BiggestInt):
doAssert(id == randomCid)
found = true
doAssert(found)
test "functional insert": test "functional insert":
let let
a = newBlobSet() a = newBlobSet()
b = insert(store, a, "foo", randomCid, 0) b = waitFor insert(store, a, "foo", randomCid, 0)
c = insert(store, b, "bar", randomCid, 0) c = waitFor insert(store, b, "bar", randomCid, 0)
doAssert(contains(store, b, "foo")) doAssert(contains(store, b, "foo"))
doAssert(contains(store, c, "foo")) doAssert(contains(store, c, "foo"))
doAssert(contains(store, c, "bar")) doAssert(contains(store, c, "bar"))
@ -43,13 +46,13 @@ suite "Blob set tests":
let let
name = $i name = $i
blob = blobHash name blob = blobHash name
bs = insert(store, bs, name, blob, 0) bs = waitFor insert(store, bs, name, blob, 0)
for i in 1..1024: for i in 1..1024:
let let
name = $i name = $i
blob = blobHash name blob = blobHash name
other = search(store, bs, name) apply(store, bs, name) do (id: BlobId; size: BiggestInt):
doAssert(other == blob) doAssert(id == blob)
test "remove": test "remove":
var bs = newBlobSet() var bs = newBlobSet()
@ -57,10 +60,10 @@ suite "Blob set tests":
let let
name = $i name = $i
blob = blobHash name blob = blobHash name
bs = insert(store, bs, name, blob, 0) bs = waitFor insert(store, bs, name, blob, 0)
for i in 1..1024: for i in 1..1024:
let name = $i let name = $i
bs = remove(store, bs, name) bs = waitFor remove(store, bs, name)
doAssert(not contains(store, bs, name)) doAssert(not contains(store, bs, name))
test "sets": test "sets":