From 49ca99b5573998fa1d1ec66cb08d681c12725df6 Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Mon, 18 Mar 2019 22:50:00 +0100 Subject: [PATCH] More futures --- genode/blobsets/src/blobsets_fs.nim | 4 +- genode/blobsets/src/blobsets_rom.nim | 3 +- src/blobset.nim | 16 ++--- src/blobsets.nim | 97 ++++++++++++++-------------- src/blobsets/spryblobs.nim | 2 +- tests/test_http.nim | 11 ++-- tests/test_set.nim | 25 +++---- 7 files changed, 84 insertions(+), 74 deletions(-) diff --git a/genode/blobsets/src/blobsets_fs.nim b/genode/blobsets/src/blobsets_fs.nim index e7779a3..fb1a1cf 100644 --- a/genode/blobsets/src/blobsets_fs.nim +++ b/genode/blobsets/src/blobsets_fs.nim @@ -302,7 +302,7 @@ componentConstructHook = proc(env: GenodeEnv) = proc createSession(env: GenodeEnv; store: BlobStore; id: ServerId; label: string; setId: SetId; txBufSize: int) = let - fsSet = store.load setId + fsSet = waitFor store.load setId session = env.newSession(store, label, setId, fsSet, txBufSize) cap = env.ep.manage session sessions[id] = session @@ -373,7 +373,7 @@ componentConstructHook = proc(env: GenodeEnv) = idStr = pAttrs["root"] policySetId = toSetId idStr if session.fsSetId != policySetId: - let newSet = store.load policySetId + let newSet = waitFor store.load policySetId session.fsSet = newSet session.fsSetId = policySetId echo idStr, " is new root of ", session.label diff --git a/genode/blobsets/src/blobsets_rom.nim b/genode/blobsets/src/blobsets_rom.nim index da9a391..1bea81e 100644 --- a/genode/blobsets/src/blobsets_rom.nim +++ b/genode/blobsets/src/blobsets_rom.nim @@ -1,3 +1,4 @@ +import std/asyncdispatch import std/xmltree, std/streams, std/strtabs, std/strutils, std/xmlparser, std/tables import genode, genode/parents, genode/servers, genode/roms import blobsets, blobsets/filestores @@ -58,7 +59,7 @@ componentConstructHook = proc(env: GenodeEnv) = proc readDataspace(romSet: SetId; label: string): DataspaceCapability = let name = label.lastLabelElement - bs = store.load romSet + bs = waitFor store.load romSet var cap: DataspaceCapability store.apply(bs, name) do (id: BlobId; size: BiggestInt): let diff --git a/src/blobset.nim b/src/blobset.nim index 7796c2d..6383b2c 100644 --- a/src/blobset.nim +++ b/src/blobset.nim @@ -62,7 +62,7 @@ proc insertPath(store: BlobStore; bs: BlobSet; kind: PathComponent; path: string let (id, size) = waitFor store.ingestFile(path) path.removePrefix(getCurrentDir()) path.removePrefix("/") - result = insert(store, result, path, id, size) + result = waitfor insert(store, result, path, id, size) writeLine(stdout, id, align($size, 11), " ", path) of pcDir, pcLinkToDir: for kind, subPath in path.walkDir: @@ -101,7 +101,7 @@ proc checkMain() {.async.} = let store = openStore() for i in 1..args.high: try: - var bs = store.load(args[i].toBlobId) + var bs = await store.load(args[i].toBlobId) let stream = newMemberStream() asyncCheck stream.streamMembers(store, bs) var m: tuple[key: Key; id: BlobId; size: BiggestInt] @@ -110,7 +110,7 @@ proc checkMain() {.async.} = if not valid: break if m.id.isShortHash: echo m.key, " has a short hash - ", m.id - bs = remove(store, bs, m.key) + bs = await remove(store, bs, m.key) echo "removed ", m.key #close(store.openBlobStream(id, size, dataBlob)) echo "commit repaired set" @@ -136,7 +136,7 @@ proc replicateMain() {.async.} = try: let srcId = args[i].toBlobId - srcSet = src.load(srcId) + srcSet = await src.load(srcId) var dstSet = newBlobSet() let stream = newMemberStream() asyncCheck stream.streamMembers(src, srcSet) @@ -159,7 +159,7 @@ proc replicateMain() {.async.} = m.id, ":", m.size, " replicated to ", otherId, ":", otherSize) quit -1 - dstSet = insert(dst, dstSet, m.key, m.id, m.size) + dstSet = await insert(dst, dstSet, m.key, m.id, m.size) let newSet = await commit(dst, dstSet) dstId = newSet.setId @@ -599,7 +599,7 @@ proc insertFunc(env: Env; args: NodeObj): NodeRef = trie = args.atom.bs blob = args.next.atom name = args.next.next.atom.str - let newBs = env.store.insert(trie, name, blob.blob, blob.size) + let newBs = waitFor env.store.insert(trie, name, blob.blob, blob.size) doAssert(not newBs.isNil) newNode(newAtom(newBs)) @@ -619,7 +619,7 @@ proc listFunc(env: Env; args: NodeObj): NodeRef = proc loadFunc(env: Env; args: NodeObj): NodeRef = assertArgCount(args, 1) - let bs = env.store.load(args.atom.bs.setId) + let bs = waitFor env.store.load(args.atom.bs.setId) bs.newAtom.newNode proc mapFunc(env: Env; args: NodeObj): NodeRef = @@ -660,7 +660,7 @@ proc removeFunc(env: Env; args: NodeObj): NodeRef = let bs = args.atom.bs name = args.next.atom.str - newNode(newAtom(env.store.remove(bs, name))) + newNode(newAtom(waitFor env.store.remove(bs, name))) proc searchFunc(env: Env; args: NodeObj): NodeRef = assertArgCount(args, 2) diff --git a/src/blobsets.nim b/src/blobsets.nim index 2d57c32..ee318a1 100644 --- a/src/blobsets.nim +++ b/src/blobsets.nim @@ -351,10 +351,10 @@ proc loadSet(store: BlobStore; id: SetId; depth: int): Future[BlobSet] {.async.} else: raise newException(ValueError, "invalid set CBOR") -proc load*(store: BlobStore; id: SetId): BlobSet = - waitFor loadSet(store, id, 0) +proc load*(store: BlobStore; id: SetId): Future[BlobSet] = + loadSet(store, id, 0) -template load*(store: BlobStore; node: BlobSet): BlobSet = +proc load*(store: BlobStore; node: BlobSet): Future[BlobSet] = load(store, node.setId) proc randomApply*(store: BlobStore; trie: BlobSet; rng: var Rand; @@ -371,7 +371,7 @@ proc randomApply*(store: BlobStore; trie: BlobSet; rng: var Rand; f(next.blob, next.size) break of coldNode: - trie.table[i] = store.load(next) + trie.table[i] = waitFor store.load(next) of hotNode: trie = next i = rng.rand(countSetBits(trie.bitmap)-1) @@ -389,7 +389,7 @@ proc streamMembers*(stream: FutureStream[tuple[key: Key; id: BlobId; size: Bigge level = 0 rng = initRand(rand(high int)) if trie.isCold: - path[0].trie = store.load(trie) + path[0].trie = await store.load(trie) else: path[0].trie = trie path[0].mask = not(0'u64) shr (64 - path[0].trie.table.len) @@ -411,7 +411,7 @@ proc streamMembers*(stream: FutureStream[tuple[key: Key; id: BlobId; size: Bigge await stream.write(val) else: if node.isCold: - node = store.load(node) + node = await store.load(node) inc level path[level].mask = not (not(0'u64) shl node.table.len) path[level].trie = node @@ -434,7 +434,8 @@ func leafCount(bs: BlobSet): int = else: result.inc n.leafCount -proc search*(store: BlobStore; trie: BlobSet; name: string): BlobId = +#[ +proc search*(store: BlobStore; trie: BlobSet; name: string): Future[BlobId] {.async.} = let key = name.toKey var n = trie @@ -443,7 +444,7 @@ proc search*(store: BlobStore; trie: BlobSet; name: string): BlobId = while k != Key(0) and n.masked(k): let i = n.compactIndex(k) if n.table[i].isCold: - n.table[i] = store.load(n.table[i]) + n.table[i] = await store.load(n.table[i]) n = n.table[i] if n.kind == leafNode: if n.key == key: @@ -452,6 +453,7 @@ proc search*(store: BlobStore; trie: BlobSet; name: string): BlobId = k = k shr keyChunkBits inc level raise newException(KeyError, "key not in blob set") +]# func apply(bs: BlobSet; cb: proc (leaf: BlobSet)) = ## Apply a callback to each set element. @@ -475,7 +477,7 @@ proc apply*(store: BlobStore; trie: BlobSet; name: string; f: proc (id: BlobId; while k != Key(0) and n.masked(k): let i = n.compactIndex(k) if n.table[i].isCold: - n.table[i] = store.load(n.table[i]) + n.table[i] = waitFor store.load(n.table[i]) n = n.table[i] if n.kind == leafNode: if n.key == key: @@ -489,79 +491,80 @@ proc contains*(store: BlobStore; bs: BlobSet; name: string): bool = found = true found -proc insert(store: BlobStore; trie, l: BlobSet; depth: int): BlobSet = +proc insert(store: BlobStore; trie, l: BlobSet; depth: int): Future[BlobSet] {.async.} = ## This procedure is recursive to a depth of keyBits/keyChunkBits. doAssert(depth < (keyBits div keyChunkBits), "key space exhausted during insert") - result = BlobSet(kind: hotNode, bitmap: trie.bitmap, table: trie.table) + var bs = BlobSet(kind: hotNode, bitmap: trie.bitmap, table: trie.table) let key = l.key shr (depth * keyChunkBits) - if result.masked(key): + if bs.masked(key): let depth = depth + 1 - i = result.compactIndex(key) - if result.table[i].isCold: - result.table[i] = store.load(result.table[i]) - case result.table[i].kind + i = bs.compactIndex(key) + if bs.table[i].isCold: + bs.table[i] = await store.load(bs.table[i]) + case bs.table[i].kind of hotNode: - result.table[i] = insert(store, result.table[i], l, depth) + bs.table[i] = await insert(store, bs.table[i], l, depth) of leafNode: - if result.table[i].key == l.key: - result.table[i] = l + if bs.table[i].key == l.key: + bs.table[i] = l else: var subtrie = newBlobSet() - subtrie = insert(store, subtrie, result.table[i], depth) - subtrie = insert(store, subtrie, l, depth) - result.table[i] = subtrie + subtrie = await insert(store, subtrie, bs.table[i], depth) + subtrie = await insert(store, subtrie, l, depth) + bs.table[i] = subtrie of coldNode: discard else: - result.bitmap = result.bitmap or key.mask - result.table.insert(l, result.compactIndex(key)) + bs.bitmap = bs.bitmap or key.mask + bs.table.insert(l, bs.compactIndex(key)) + return bs -proc insert*(store: BlobStore; trie, node: BlobSet): BlobSet = +proc insert*(store: BlobStore; trie, node: BlobSet): Future[BlobSet] = ## Insert set node `node` into `trie`. insert(store, trie, node, 0) -proc insert*(store: BlobStore; t: BlobSet; key: Key; blob: BlobId; size: BiggestInt): BlobSet = +proc insert*(store: BlobStore; t: BlobSet; key: Key; blob: BlobId; size: BiggestInt): Future[BlobSet] = ## Insert a blob hash into a trie. let leaf = BlobSet(kind: leafNode, key: key, blob: blob, size: size) insert(store, t, leaf) -proc insert*(store: BlobStore; t: BlobSet; name: string; blob: BlobId; size: BiggestInt): BlobSet = +proc insert*(store: BlobStore; t: BlobSet; name: string; blob: BlobId; size: BiggestInt): Future[BlobSet] = insert(store, t, name.toKey, blob, size) -proc remove(store: BlobStore; trie: BlobSet; fullKey: Key; depth: int): BlobSet = - result = trie +proc remove(store: BlobStore; trie: BlobSet; fullKey: Key; depth: int): Future[BlobSet] {.async.} = + var res = trie let key = fullKey shr (depth * keyChunkBits) - if result.masked(key): + if res.masked(key): let depth = depth + 1 - i = result.compactIndex(key) - if result.table[i].isCold: - result.table[i] = store.load(result.table[i]) - trie.table[i] = result.table[i] - case result.table[i].kind + i = res.compactIndex(key) + if res.table[i].isCold: + res.table[i] = await store.load(res.table[i]) + trie.table[i] = res.table[i] + case res.table[i].kind of hotNode: - result.table[i] = remove(store, result.table[i], fullKey, depth) + res.table[i] = await remove(store, res.table[i], fullKey, depth) of leafNode: - if result.table.len == 2: - result.table.delete(i) - result = result.table[0] + if res.table.len == 2: + res.table.delete(i) + res = res.table[0] else: - result.table.delete(i) - result.bitmap = result.bitmap xor key.mask + res.table.delete(i) + res.bitmap = res.bitmap xor key.mask of coldNode: discard # previously handled + return res -proc remove*(store: BlobStore; trie: BlobSet; key: Key): BlobSet = +proc remove*(store: BlobStore; trie: BlobSet; key: Key): Future[BlobSet] = ## Remove a blob from a trie. if trie.isEmpty: - result = trie + result = newFuture[BlobSet]() + result.complete trie else: result = remove(store, trie, key, 0) - if result.isNil: - result = newBlobSet() -proc remove*(store: BlobStore; trie: BlobSet; name: string): BlobSet = +proc remove*(store: BlobStore; trie: BlobSet; name: string): Future[BlobSet] = remove(store, trie, name.toKey) proc union*(store: BlobStore; sets: varargs[BlobSet]): BlobSet = @@ -569,7 +572,7 @@ proc union*(store: BlobStore; sets: varargs[BlobSet]): BlobSet = # TODO: lazy-load set var fresh = newBlobSet() proc freshInsert(leaf: BlobSet) = - fresh = insert(store, fresh, leaf) + fresh = waitFor insert(store, fresh, leaf) for bs in sets: assert(not bs.isnil) bs.apply(freshInsert) diff --git a/src/blobsets/spryblobs.nim b/src/blobsets/spryblobs.nim index 842b831..5c2bd96 100644 --- a/src/blobsets/spryblobs.nim +++ b/src/blobsets/spryblobs.nim @@ -31,7 +31,7 @@ proc insertPath(store: BlobStore; bs: BlobSet; kind: PathComponent; path: string let (id, size) = waitFor store.ingestFile(path) path.removePrefix(getCurrentDir()) path.removePrefix("/") - result = insert(store, result, path, id, size) + result = waitFor insert(store, result, path, id, size) writeLine(stdout, id, align($size, 11), " ", path) of pcDir, pcLinkToDir: for kind, subPath in path.walkDir: diff --git a/tests/test_http.nim b/tests/test_http.nim index a4ca46c..b05ba89 100644 --- a/tests/test_http.nim +++ b/tests/test_http.nim @@ -43,17 +43,20 @@ suite "store": name = $i blob = waitFor client.ingest(newString(i)) echo "insert ", blob, " ", i - bs = insert(client, bs, name, blob, i) + bs = waitFor insert(client, bs, name, blob, i) setId = (waitFor commit(client, bs)).setId test "load": - bs = load(client, setId) + bs = waitFor load(client, setId) for i in 1..count: let name = $i blob = blobHash newString(i) - other = search(store, bs, name) - #doAssert(other == blob) + var found = false + apply(client, bs, name) do (id: BlobId; size: BiggestInt): + doAssert(id == blob) + found = true + doAssert(found) for i in 1..count: let i = i and 0x8000 diff --git a/tests/test_set.nim b/tests/test_set.nim index 7ca8be9..d089e5a 100644 --- a/tests/test_set.nim +++ b/tests/test_set.nim @@ -1,4 +1,4 @@ -import std/unittest, std/os, std/parseopt +import std/asyncdispatch, std/unittest, std/os, std/parseopt import ../src/blobsets @@ -21,15 +21,18 @@ suite "Blob set tests": blob = randomCid str = $randomCid doAssert(str.toBlobid == randomCid) - result = insert(store, s, path, blob, 0) - let found = search(store, result, path) - doAssert(found == randomCid) + result = waitFor insert(store, s, path, blob, 0) + var found = false + apply(store, result, path) do (id: BlobId; size: BiggestInt): + doAssert(id == randomCid) + found = true + doAssert(found) test "functional insert": let a = newBlobSet() - b = insert(store, a, "foo", randomCid, 0) - c = insert(store, b, "bar", randomCid, 0) + b = waitFor insert(store, a, "foo", randomCid, 0) + c = waitFor insert(store, b, "bar", randomCid, 0) doAssert(contains(store, b, "foo")) doAssert(contains(store, c, "foo")) doAssert(contains(store, c, "bar")) @@ -43,13 +46,13 @@ suite "Blob set tests": let name = $i blob = blobHash name - bs = insert(store, bs, name, blob, 0) + bs = waitFor insert(store, bs, name, blob, 0) for i in 1..1024: let name = $i blob = blobHash name - other = search(store, bs, name) - doAssert(other == blob) + apply(store, bs, name) do (id: BlobId; size: BiggestInt): + doAssert(id == blob) test "remove": var bs = newBlobSet() @@ -57,10 +60,10 @@ suite "Blob set tests": let name = $i blob = blobHash name - bs = insert(store, bs, name, blob, 0) + bs = waitFor insert(store, bs, name, blob, 0) for i in 1..1024: let name = $i - bs = remove(store, bs, name) + bs = waitFor remove(store, bs, name) doAssert(not contains(store, bs, name)) test "sets":