diff --git a/genode/blobsets_genode.nimble b/genode/blobsets_genode.nimble index 1635da8..37ebdfe 100644 --- a/genode/blobsets_genode.nimble +++ b/genode/blobsets_genode.nimble @@ -22,3 +22,4 @@ requires "nim >= 0.19.0", "blobsets", "genode" task genode, "Build for Genode": exec "nimble build --os:genode -d:posix" + diff --git a/genode/src/blobsets_fs.nim b/genode/src/blobsets_fs.nim index fc18b66..045d12b 100644 --- a/genode/src/blobsets_fs.nim +++ b/genode/src/blobsets_fs.nim @@ -1,3 +1,4 @@ +import std/asyncdispatch import std/tables, std/xmltree, std/strtabs, std/strutils, std/streams, std/xmlparser import genode, genode/signals, genode/parents, genode/servers, genode/roms @@ -238,7 +239,7 @@ proc processPacket(session: SessionRef; pkt: var FsPacket) = pkt.succeeded true of fileNode: node.stream.pos = pkt.position.int - let n = node.stream.read(pktBuf, pkt.len) + let n = waitFor node.stream.read(pktBuf, pkt.len) pkt.setLen n pkt.succeeded true else: diff --git a/src/blobset.nim b/src/blobset.nim index 295a04d..3c239e9 100644 --- a/src/blobset.nim +++ b/src/blobset.nim @@ -56,7 +56,7 @@ proc insertPath(store: BlobStore; bs: BlobSet; kind: PathComponent; path: string case kind of pcFile, pcLinkToFile: var path = normalizedPath path - let (id, size) = store.ingestFile(path) + let (id, size) = waitFor store.ingestFile(path) path.removePrefix(getCurrentDir()) path.removePrefix("/") result = result.insert(path, id, size) @@ -234,7 +234,7 @@ template returnError(n: NodeObj) = proc getBlob(env: Env; path: string): tuple[id: BlobId, size: BiggestInt] = result = env.blobs.getOrDefault(path) if result.size == 0: - result = env.store.ingestFile(path) + result = waitFor env.store.ingestFile(path) if result.size != 0: env.blobs[path] = result diff --git a/src/blobsets.nim b/src/blobsets.nim index f3c081e..c976177 100644 --- a/src/blobsets.nim +++ b/src/blobsets.nim @@ -411,7 +411,7 @@ type IngestStream* = ref IngestStreamObj IngestStreamObj* = object of RootObj cancelImpl*: proc (s: IngestStream) {.nimcall, gcsafe.} - finishImpl*: proc (s: IngestStream): tuple[id: BlobId, size: BiggestInt] {.nimcall, gcsafe.} + finishImpl*: proc (s: IngestStream): Future[tuple[id: BlobId, size: BiggestInt]] {.nimcall, gcsafe.} ingestImpl*: proc (s: IngestStream; buf: pointer; size: int): Future[void] {.nimcall, gcsafe.} proc close*(s: BlobStream) = @@ -430,29 +430,29 @@ proc pos*(s: BlobStream): BiggestInt = assert(not s.getPosImpl.isNil) s.getPosImpl(s) -proc read*(s: BlobStream; buf: pointer; len: Natural): int = +proc read*(s: BlobStream; buf: pointer; len: Natural): Future[int] = assert(not s.readImpl.isNil) - waitFor s.readImpl(s, buf, len) + s.readImpl(s, buf, len) proc cancle*(s: IngestStream): tuple[id: BlobId, size: BiggestInt] = ## Cancel and close ingest stream assert(not s.cancelImpl.isNil) s.cancelImpl(s) -proc finish*(s: IngestStream): tuple[id: BlobId, size: BiggestInt] = +proc finish*(s: IngestStream): Future[tuple[id: BlobId, size: BiggestInt]] = ## Finish ingest stream assert(not s.finishImpl.isNil) s.finishImpl(s) -proc ingest*(s: IngestStream; buf: pointer; size: Natural) = +proc ingest*(s: IngestStream; buf: pointer; size: Natural): Future[void] = ## Ingest stream assert(not s.ingestImpl.isNil) - waitFor s.ingestImpl(s, buf, size) + s.ingestImpl(s, buf, size) -proc ingest*(s: IngestStream; buf: string) = +proc ingest*(s: IngestStream; buf: string): Future[void] = ## Ingest stream assert(not s.ingestImpl.isNil) - waitFor s.ingestImpl(s, buf[0].unsafeAddr, buf.len) + s.ingestImpl(s, buf[0].unsafeAddr, buf.len) type BlobStore* = ref BlobStoreObj @@ -488,12 +488,15 @@ proc nullOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKi getPosImpl: getPosNull, readImpl: nullBlobRead) -proc nullFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] = +proc nullFinish(s: IngestStream): Future[tuple[id: BlobId, size: BiggestInt]] = var s = NullIngestStream(s) s.leaves.add finish(s.ctx) compressTree(s.leaves) - result.id = s.leaves[0] - result.size = s.pos + var pair: tuple[id: BlobId, size: BiggestInt] + pair.id = s.leaves[0] + pair.size = s.pos + result = newFuture[tuple[id: BlobId, size: BiggestInt]]() + complete result, pair proc nullIngest(s: IngestStream; buf: pointer; len: Natural): Future[void] = var @@ -551,13 +554,13 @@ iterator dumpBlob*(store: BlobStore; id: BlobId): string = close stream while true: buf.setLen(blobLeafSize) - let n = stream.read(buf[0].addr, buf.len) + let n = waitFor stream.read(buf[0].addr, buf.len) if n == 0: break buf.setLen(n) yield buf -proc loadSet(store: BlobStore; id: SetId; depth: int): BlobSet = +proc loadSet(store: BlobStore; id: SetId; depth: int): Future[BlobSet] {.async.} = if Key.high shr depth == 0: raiseAssert("loadSet trie is too deep") var @@ -565,7 +568,7 @@ proc loadSet(store: BlobStore; id: SetId; depth: int): BlobSet = buf = newString(blobLeafSize) defer: close stream - let n = stream.read(buf[0].addr, buf.len) + let n = await stream.read(buf[0].addr, buf.len) buf.setLen(n) let c = buf.parseCbor.val @@ -581,7 +584,8 @@ proc loadSet(store: BlobStore; id: SetId; depth: int): BlobSet = let node = c[i].val case c[i].tag.int of nodeTag: - result.table.add loadSet(store, node.toSetId, depth+1) + let child = await loadSet(store, node.toSetId, depth+1) + result.table.add child of leafTag: let leaf = BlobSet( @@ -594,7 +598,7 @@ proc loadSet(store: BlobStore; id: SetId; depth: int): BlobSet = raise newException(ValueError, "invalid set CBOR") proc loadSet*(store: BlobStore; id: SetId): BlobSet = - loadSet store, id, 0 + waitFor loadSet(store, id, 0) proc commit*(store: BlobStore; bs: BlobSet): BlobSet = assert(bs.kind == hotNode) @@ -605,8 +609,8 @@ proc commit*(store: BlobStore; bs: BlobSet): BlobSet = e = store.commit e let stream = store.openIngestStream(kind=metaBlob) var buf = encode bs.toCbor - stream.ingest(buf) - let (id, _) = finish stream + waitFor stream.ingest(buf) + let (id, _) = waitFor finish(stream) result = BlobSet(kind: coldNode, setId: id) proc apply*(store: BlobStore; bs: BlobSet; name: string; f: proc (id: BlobId; size: BiggestInt)) = diff --git a/src/blobsets/filestores.nim b/src/blobsets/filestores.nim index 3a89957..e209e3c 100644 --- a/src/blobsets/filestores.nim +++ b/src/blobsets/filestores.nim @@ -4,7 +4,7 @@ import std/asyncfile, std/asyncdispatch, std/os import nimcrypto/blake2 -proc ingestFile*(store: BlobStore; path: string): tuple[id: BlobId, size: BiggestInt] = +proc ingestFile*(store: BlobStore; path: string): Future[tuple[id: BlobId, size: BiggestInt]] {.async.} = ## Ingest a file and return blob metadata. let file = openAsync(path, fmRead) @@ -15,10 +15,10 @@ proc ingestFile*(store: BlobStore; path: string): tuple[id: BlobId, size: Bigges if fileSize > 0: var buf = newString(min(blobLeafSize, fileSize)) while true: - let n = waitFor file.readBuffer(buf[0].addr, buf.len) + let n = await file.readBuffer(buf[0].addr, buf.len) if n == 0: break - stream.ingest(buf[0].addr, n) - result = finish stream + await stream.ingest(buf[0].addr, n) + return await finish stream type FsBlobStream = ref FsBlobStreamObj @@ -71,14 +71,22 @@ proc fsOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind except: raise newException(KeyError, "blob not in file-system store") -proc fsFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] = - var s = FsIngestStream(s) +proc fsFinish(s: IngestStream): Future[tuple[id: BlobId, size: BiggestInt]] = + var + s = FsIngestStream(s) + pair: tuple[id: BlobId, size: BiggestInt] close s.file s.leaves.add finish(s.ctx) compressTree(s.leaves) - result.id = s.leaves[0] - result.size = s.pos - moveFile(s.path, s.path.parentDir / result.id.toHex) + pair.id = s.leaves[0] + pair.size = s.pos + let finalPath = s.path.parentDir / pair.id.toHex + if fileExists finalPath: + removeFile s.path + else: + moveFile(s.path, finalPath) + result = newFuture[tuple[id: BlobId, size: BiggestInt]]() + complete result, pair proc fsIngest(s: IngestStream; buf: pointer; len: Natural) {.async.} = var diff --git a/src/blobsets/httpservers.nim b/src/blobsets/httpservers.nim index 2f2cafb..5bf9475 100644 --- a/src/blobsets/httpservers.nim +++ b/src/blobsets/httpservers.nim @@ -53,7 +53,7 @@ proc head(hss: HttpStoreServer; req: Request): Future[void] = # cache the stream in the blob table or raise an exception req.respond(Http200, "") -proc get(hss: HttpStoreServer; req: Request): Future[void] = +proc get(hss: HttpStoreServer; req: Request) {.async.} = let stream = hss.blobStream(req.url.path) var pos: BiggestInt @@ -66,17 +66,17 @@ proc get(hss: HttpStoreServer; req: Request): Future[void] = len = endPos - startPos stream.pos = pos var body = newString(len) - len = stream.read(body[0].addr, len) + len = await stream.read(body[0].addr, len) body.setLen len let headers = newHttpHeaders({"Range": "bytes=$1-$2" % [ $pos, $(pos+len) ]}) - req.respond(Http206, body, headers) + await req.respond(Http206, body, headers) -proc putIngest(hss: HttpStoreServer; req: Request): Future[void] = +proc putIngest(hss: HttpStoreServer; req: Request) {.async.} = let stream = hss.ingests[req.url.path] - stream.ingest(req.body) - req.respond(Http204, "") + await stream.ingest(req.body) + await req.respond(Http204, "") -proc postIngest(hss: HttpStoreServer; req: Request): Future[void] = +proc postIngest(hss: HttpStoreServer; req: Request) {.async.} = case req.url.path of "/ingest/" & $dataBlob: let @@ -84,24 +84,24 @@ proc postIngest(hss: HttpStoreServer; req: Request): Future[void] = key = "/ingest/" & $hss.rng.next headers = newHttpHeaders({"Location": key}) hss.ingests[key] = hss.store.openIngestStream(size, dataBlob) - req.respond(Http201, "", headers) + await req.respond(Http201, "", headers) of "/ingest/" & $metaBlob: let size = parseBiggestInt req.headers["ingest-size"] key = "/ingest/" & $hss.rng.next headers = newHttpHeaders({"Location": key}) hss.ingests[key] = hss.store.openIngestStream(size, metaBlob) - req.respond(Http201, "", headers) + await req.respond(Http201, "", headers) else: let stream = hss.ingests[req.url.path] - (blob, size) = finish stream + (blob, size) = await finish stream headers = newHttpHeaders({ "blob-id": blob.toHex, "blob-size": $size, }) hss.ingests.del req.url.path - req.respond(Http204, "", headers) + await req.respond(Http204, "", headers) proc serve*(hss: HttpStoreServer; port = Port(80)): Future[void] = ## Serve requests to HTTP store. diff --git a/src/blobsets/httpstores.nim b/src/blobsets/httpstores.nim index c44dff5..dc09a97 100644 --- a/src/blobsets/httpstores.nim +++ b/src/blobsets/httpstores.nim @@ -67,13 +67,16 @@ proc httpOpenBlobStream(store: BlobStore; id: BlobId; size: BiggestInt; kind: Bl raise newException(KeyError, "unrecognized " & $resp.code & " response from HTTP store: " & resp.status) -proc httpFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] = - var s = HttpIngestStream(s) - let resp = waitFor s.client.request($s.url, HttpPOST) +proc httpFinish(s: IngestStream): Future[tuple[id: BlobId, size: BiggestInt]] {.async.} = + var + s = HttpIngestStream(s) + pair: tuple[id: BlobId, size: BiggestInt] + let resp = await s.client.request($s.url, HttpPOST) if not resp.code.is2xx: raiseAssert(resp.status) - result.id = toBlobId resp.headers["blob-id"] - result.size = parseBiggestInt resp.headers["blob-size"] + pair.id = toBlobId resp.headers["blob-id"] + pair.size = parseBiggestInt resp.headers["blob-size"] + return pair proc httpIngest(x: IngestStream; buf: pointer; len: Natural) {.async.} = var diff --git a/src/blobsets/spryblobs.nim b/src/blobsets/spryblobs.nim index e232857..cf2cc59 100644 --- a/src/blobsets/spryblobs.nim +++ b/src/blobsets/spryblobs.nim @@ -1,3 +1,4 @@ +import asyncdispatch import ../blobsets, ./filestores, ./httpstores import spryvm/spryvm import cbor @@ -27,7 +28,7 @@ proc insertPath(store: BlobStore; bs: BlobSet; kind: PathComponent; path: string case kind of pcFile, pcLinkToFile: var path = normalizedPath path - let (id, size) = store.ingestFile(path) + let (id, size) = waitFor store.ingestFile(path) path.removePrefix(getCurrentDir()) path.removePrefix("/") result = result.insert(path, id, size)