more async

This commit is contained in:
Ehmry - 2019-02-10 13:47:40 +01:00
parent a315eb645a
commit 3da2accfd5
8 changed files with 65 additions and 47 deletions

View File

@ -22,3 +22,4 @@ requires "nim >= 0.19.0", "blobsets", "genode"
task genode, "Build for Genode":
exec "nimble build --os:genode -d:posix"

View File

@ -1,3 +1,4 @@
import std/asyncdispatch
import std/tables, std/xmltree, std/strtabs, std/strutils, std/streams, std/xmlparser
import genode, genode/signals, genode/parents, genode/servers, genode/roms
@ -238,7 +239,7 @@ proc processPacket(session: SessionRef; pkt: var FsPacket) =
pkt.succeeded true
of fileNode:
node.stream.pos = pkt.position.int
let n = node.stream.read(pktBuf, pkt.len)
let n = waitFor node.stream.read(pktBuf, pkt.len)
pkt.setLen n
pkt.succeeded true
else:

View File

@ -56,7 +56,7 @@ proc insertPath(store: BlobStore; bs: BlobSet; kind: PathComponent; path: string
case kind
of pcFile, pcLinkToFile:
var path = normalizedPath path
let (id, size) = store.ingestFile(path)
let (id, size) = waitFor store.ingestFile(path)
path.removePrefix(getCurrentDir())
path.removePrefix("/")
result = result.insert(path, id, size)
@ -234,7 +234,7 @@ template returnError(n: NodeObj) =
proc getBlob(env: Env; path: string): tuple[id: BlobId, size: BiggestInt] =
result = env.blobs.getOrDefault(path)
if result.size == 0:
result = env.store.ingestFile(path)
result = waitFor env.store.ingestFile(path)
if result.size != 0:
env.blobs[path] = result

View File

@ -411,7 +411,7 @@ type
IngestStream* = ref IngestStreamObj
IngestStreamObj* = object of RootObj
cancelImpl*: proc (s: IngestStream) {.nimcall, gcsafe.}
finishImpl*: proc (s: IngestStream): tuple[id: BlobId, size: BiggestInt] {.nimcall, gcsafe.}
finishImpl*: proc (s: IngestStream): Future[tuple[id: BlobId, size: BiggestInt]] {.nimcall, gcsafe.}
ingestImpl*: proc (s: IngestStream; buf: pointer; size: int): Future[void] {.nimcall, gcsafe.}
proc close*(s: BlobStream) =
@ -430,29 +430,29 @@ proc pos*(s: BlobStream): BiggestInt =
assert(not s.getPosImpl.isNil)
s.getPosImpl(s)
proc read*(s: BlobStream; buf: pointer; len: Natural): int =
proc read*(s: BlobStream; buf: pointer; len: Natural): Future[int] =
assert(not s.readImpl.isNil)
waitFor s.readImpl(s, buf, len)
s.readImpl(s, buf, len)
proc cancle*(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
## Cancel and close ingest stream
assert(not s.cancelImpl.isNil)
s.cancelImpl(s)
proc finish*(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
proc finish*(s: IngestStream): Future[tuple[id: BlobId, size: BiggestInt]] =
## Finish ingest stream
assert(not s.finishImpl.isNil)
s.finishImpl(s)
proc ingest*(s: IngestStream; buf: pointer; size: Natural) =
proc ingest*(s: IngestStream; buf: pointer; size: Natural): Future[void] =
## Ingest stream
assert(not s.ingestImpl.isNil)
waitFor s.ingestImpl(s, buf, size)
s.ingestImpl(s, buf, size)
proc ingest*(s: IngestStream; buf: string) =
proc ingest*(s: IngestStream; buf: string): Future[void] =
## Ingest stream
assert(not s.ingestImpl.isNil)
waitFor s.ingestImpl(s, buf[0].unsafeAddr, buf.len)
s.ingestImpl(s, buf[0].unsafeAddr, buf.len)
type
BlobStore* = ref BlobStoreObj
@ -488,12 +488,15 @@ proc nullOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKi
getPosImpl: getPosNull,
readImpl: nullBlobRead)
proc nullFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
proc nullFinish(s: IngestStream): Future[tuple[id: BlobId, size: BiggestInt]] =
var s = NullIngestStream(s)
s.leaves.add finish(s.ctx)
compressTree(s.leaves)
result.id = s.leaves[0]
result.size = s.pos
var pair: tuple[id: BlobId, size: BiggestInt]
pair.id = s.leaves[0]
pair.size = s.pos
result = newFuture[tuple[id: BlobId, size: BiggestInt]]()
complete result, pair
proc nullIngest(s: IngestStream; buf: pointer; len: Natural): Future[void] =
var
@ -551,13 +554,13 @@ iterator dumpBlob*(store: BlobStore; id: BlobId): string =
close stream
while true:
buf.setLen(blobLeafSize)
let n = stream.read(buf[0].addr, buf.len)
let n = waitFor stream.read(buf[0].addr, buf.len)
if n == 0:
break
buf.setLen(n)
yield buf
proc loadSet(store: BlobStore; id: SetId; depth: int): BlobSet =
proc loadSet(store: BlobStore; id: SetId; depth: int): Future[BlobSet] {.async.} =
if Key.high shr depth == 0:
raiseAssert("loadSet trie is too deep")
var
@ -565,7 +568,7 @@ proc loadSet(store: BlobStore; id: SetId; depth: int): BlobSet =
buf = newString(blobLeafSize)
defer:
close stream
let n = stream.read(buf[0].addr, buf.len)
let n = await stream.read(buf[0].addr, buf.len)
buf.setLen(n)
let
c = buf.parseCbor.val
@ -581,7 +584,8 @@ proc loadSet(store: BlobStore; id: SetId; depth: int): BlobSet =
let node = c[i].val
case c[i].tag.int
of nodeTag:
result.table.add loadSet(store, node.toSetId, depth+1)
let child = await loadSet(store, node.toSetId, depth+1)
result.table.add child
of leafTag:
let
leaf = BlobSet(
@ -594,7 +598,7 @@ proc loadSet(store: BlobStore; id: SetId; depth: int): BlobSet =
raise newException(ValueError, "invalid set CBOR")
proc loadSet*(store: BlobStore; id: SetId): BlobSet =
loadSet store, id, 0
waitFor loadSet(store, id, 0)
proc commit*(store: BlobStore; bs: BlobSet): BlobSet =
assert(bs.kind == hotNode)
@ -605,8 +609,8 @@ proc commit*(store: BlobStore; bs: BlobSet): BlobSet =
e = store.commit e
let stream = store.openIngestStream(kind=metaBlob)
var buf = encode bs.toCbor
stream.ingest(buf)
let (id, _) = finish stream
waitFor stream.ingest(buf)
let (id, _) = waitFor finish(stream)
result = BlobSet(kind: coldNode, setId: id)
proc apply*(store: BlobStore; bs: BlobSet; name: string; f: proc (id: BlobId; size: BiggestInt)) =

View File

@ -4,7 +4,7 @@ import std/asyncfile, std/asyncdispatch, std/os
import nimcrypto/blake2
proc ingestFile*(store: BlobStore; path: string): tuple[id: BlobId, size: BiggestInt] =
proc ingestFile*(store: BlobStore; path: string): Future[tuple[id: BlobId, size: BiggestInt]] {.async.} =
## Ingest a file and return blob metadata.
let
file = openAsync(path, fmRead)
@ -15,10 +15,10 @@ proc ingestFile*(store: BlobStore; path: string): tuple[id: BlobId, size: Bigges
if fileSize > 0:
var buf = newString(min(blobLeafSize, fileSize))
while true:
let n = waitFor file.readBuffer(buf[0].addr, buf.len)
let n = await file.readBuffer(buf[0].addr, buf.len)
if n == 0: break
stream.ingest(buf[0].addr, n)
result = finish stream
await stream.ingest(buf[0].addr, n)
return await finish stream
type
FsBlobStream = ref FsBlobStreamObj
@ -71,14 +71,22 @@ proc fsOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind
except:
raise newException(KeyError, "blob not in file-system store")
proc fsFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
var s = FsIngestStream(s)
proc fsFinish(s: IngestStream): Future[tuple[id: BlobId, size: BiggestInt]] =
var
s = FsIngestStream(s)
pair: tuple[id: BlobId, size: BiggestInt]
close s.file
s.leaves.add finish(s.ctx)
compressTree(s.leaves)
result.id = s.leaves[0]
result.size = s.pos
moveFile(s.path, s.path.parentDir / result.id.toHex)
pair.id = s.leaves[0]
pair.size = s.pos
let finalPath = s.path.parentDir / pair.id.toHex
if fileExists finalPath:
removeFile s.path
else:
moveFile(s.path, finalPath)
result = newFuture[tuple[id: BlobId, size: BiggestInt]]()
complete result, pair
proc fsIngest(s: IngestStream; buf: pointer; len: Natural) {.async.} =
var

View File

@ -53,7 +53,7 @@ proc head(hss: HttpStoreServer; req: Request): Future[void] =
# cache the stream in the blob table or raise an exception
req.respond(Http200, "")
proc get(hss: HttpStoreServer; req: Request): Future[void] =
proc get(hss: HttpStoreServer; req: Request) {.async.} =
let stream = hss.blobStream(req.url.path)
var
pos: BiggestInt
@ -66,17 +66,17 @@ proc get(hss: HttpStoreServer; req: Request): Future[void] =
len = endPos - startPos
stream.pos = pos
var body = newString(len)
len = stream.read(body[0].addr, len)
len = await stream.read(body[0].addr, len)
body.setLen len
let headers = newHttpHeaders({"Range": "bytes=$1-$2" % [ $pos, $(pos+len) ]})
req.respond(Http206, body, headers)
await req.respond(Http206, body, headers)
proc putIngest(hss: HttpStoreServer; req: Request): Future[void] =
proc putIngest(hss: HttpStoreServer; req: Request) {.async.} =
let stream = hss.ingests[req.url.path]
stream.ingest(req.body)
req.respond(Http204, "")
await stream.ingest(req.body)
await req.respond(Http204, "")
proc postIngest(hss: HttpStoreServer; req: Request): Future[void] =
proc postIngest(hss: HttpStoreServer; req: Request) {.async.} =
case req.url.path
of "/ingest/" & $dataBlob:
let
@ -84,24 +84,24 @@ proc postIngest(hss: HttpStoreServer; req: Request): Future[void] =
key = "/ingest/" & $hss.rng.next
headers = newHttpHeaders({"Location": key})
hss.ingests[key] = hss.store.openIngestStream(size, dataBlob)
req.respond(Http201, "", headers)
await req.respond(Http201, "", headers)
of "/ingest/" & $metaBlob:
let
size = parseBiggestInt req.headers["ingest-size"]
key = "/ingest/" & $hss.rng.next
headers = newHttpHeaders({"Location": key})
hss.ingests[key] = hss.store.openIngestStream(size, metaBlob)
req.respond(Http201, "", headers)
await req.respond(Http201, "", headers)
else:
let
stream = hss.ingests[req.url.path]
(blob, size) = finish stream
(blob, size) = await finish stream
headers = newHttpHeaders({
"blob-id": blob.toHex,
"blob-size": $size,
})
hss.ingests.del req.url.path
req.respond(Http204, "", headers)
await req.respond(Http204, "", headers)
proc serve*(hss: HttpStoreServer; port = Port(80)): Future[void] =
## Serve requests to HTTP store.

View File

@ -67,13 +67,16 @@ proc httpOpenBlobStream(store: BlobStore; id: BlobId; size: BiggestInt; kind: Bl
raise newException(KeyError,
"unrecognized " & $resp.code & " response from HTTP store: " & resp.status)
proc httpFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] =
var s = HttpIngestStream(s)
let resp = waitFor s.client.request($s.url, HttpPOST)
proc httpFinish(s: IngestStream): Future[tuple[id: BlobId, size: BiggestInt]] {.async.} =
var
s = HttpIngestStream(s)
pair: tuple[id: BlobId, size: BiggestInt]
let resp = await s.client.request($s.url, HttpPOST)
if not resp.code.is2xx:
raiseAssert(resp.status)
result.id = toBlobId resp.headers["blob-id"]
result.size = parseBiggestInt resp.headers["blob-size"]
pair.id = toBlobId resp.headers["blob-id"]
pair.size = parseBiggestInt resp.headers["blob-size"]
return pair
proc httpIngest(x: IngestStream; buf: pointer; len: Natural) {.async.} =
var

View File

@ -1,3 +1,4 @@
import asyncdispatch
import ../blobsets, ./filestores, ./httpstores
import spryvm/spryvm
import cbor
@ -27,7 +28,7 @@ proc insertPath(store: BlobStore; bs: BlobSet; kind: PathComponent; path: string
case kind
of pcFile, pcLinkToFile:
var path = normalizedPath path
let (id, size) = store.ingestFile(path)
let (id, size) = waitFor store.ingestFile(path)
path.removePrefix(getCurrentDir())
path.removePrefix("/")
result = result.insert(path, id, size)