From a315eb645aca43a115f92f08e77afaf2fb1070c3 Mon Sep 17 00:00:00 2001 From: Emery Hemingway Date: Fri, 8 Feb 2019 16:57:46 +0100 Subject: [PATCH] Make read and ingest asynchronous --- src/blobsets.nim | 28 +++++++++++++--------------- src/blobsets/filestores.nim | 8 ++++---- src/blobsets/httpservers.nim | 13 ++++++++----- src/blobsets/httpstores.nim | 8 ++++---- 4 files changed, 29 insertions(+), 28 deletions(-) diff --git a/src/blobsets.nim b/src/blobsets.nim index 8bd316b..f3c081e 100644 --- a/src/blobsets.nim +++ b/src/blobsets.nim @@ -1,3 +1,4 @@ +import std/asyncdispatch import std/hashes, std/streams, std/strutils, std/bitops, std/unicode, std/endians import cbor, siphash import ./blobsets/priv/hex @@ -406,12 +407,12 @@ type sizeImpl*: proc (s: BlobStream): BiggestInt {.nimcall, gcsafe.} setPosImpl*: proc (s: BlobStream; pos: BiggestInt) {.nimcall, gcsafe.} getPosImpl*: proc (s: BlobStream): BiggestInt {.nimcall, gcsafe.} - readImpl*: proc (s: BlobStream; buffer: pointer; bufLen: int): int {.nimcall, gcsafe.} + readImpl*: proc (s: BlobStream; buffer: pointer; bufLen: int): Future[int] {.nimcall, gcsafe.} IngestStream* = ref IngestStreamObj IngestStreamObj* = object of RootObj cancelImpl*: proc (s: IngestStream) {.nimcall, gcsafe.} finishImpl*: proc (s: IngestStream): tuple[id: BlobId, size: BiggestInt] {.nimcall, gcsafe.} - ingestImpl*: proc (s: IngestStream; buf: pointer; size: int) {.nimcall, gcsafe.} + ingestImpl*: proc (s: IngestStream; buf: pointer; size: int): Future[void] {.nimcall, gcsafe.} proc close*(s: BlobStream) = assert(not s.closeImpl.isNil) @@ -431,7 +432,7 @@ proc pos*(s: BlobStream): BiggestInt = proc read*(s: BlobStream; buf: pointer; len: Natural): int = assert(not s.readImpl.isNil) - result = s.readImpl(s, buf, len) + waitFor s.readImpl(s, buf, len) proc cancle*(s: IngestStream): tuple[id: BlobId, size: BiggestInt] = ## Cancel and close ingest stream @@ -446,19 +447,12 @@ proc finish*(s: IngestStream): tuple[id: BlobId, size: BiggestInt] = proc ingest*(s: IngestStream; buf: pointer; size: Natural) = ## Ingest stream assert(not s.ingestImpl.isNil) - s.ingestImpl(s, buf, size) + waitFor s.ingestImpl(s, buf, size) -proc ingest*(s: IngestStream; buf: var string) = +proc ingest*(s: IngestStream; buf: string) = ## Ingest stream assert(not s.ingestImpl.isNil) - s.ingestImpl(s, buf[0].addr, buf.len) - -proc ingest*(s: IngestStream; buf: string) {.deprecated.} = - ## Ingest stream - # TODO: zero-copy - assert(not s.ingestImpl.isNil) - var buf = buf - s.ingestImpl(s, buf[0].addr, buf.len) + waitFor s.ingestImpl(s, buf[0].unsafeAddr, buf.len) type BlobStore* = ref BlobStoreObj @@ -483,7 +477,9 @@ proc nullBlobClose(s: BlobStream) = discard proc setPosNull(s: BlobStream; pos: BiggestInt) = discard proc getPosNull(s: BlobStream): BiggestInt = discard -proc nullBlobRead(s: BlobStream; buffer: pointer; len: Natural): int = 0 +proc nullBlobRead(s: BlobStream; buffer: pointer; len: Natural): Future[int] = + result = newFuture[int]() + complete result, 0 proc nullOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream = BlobStream( @@ -499,7 +495,7 @@ proc nullFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] = result.id = s.leaves[0] result.size = s.pos -proc nullIngest(s: IngestStream; buf: pointer; len: Natural) = +proc nullIngest(s: IngestStream; buf: pointer; len: Natural): Future[void] = var s = NullIngestStream(s) off = 0 @@ -521,6 +517,8 @@ proc nullIngest(s: IngestStream; buf: pointer; len: Natural) = s.ctx.update(buf[off].addr, n) off.inc n s.pos.inc n + result = newFuture[void]() + complete result proc nullOpenIngestStream(s: BlobStore; size: BiggestInt; kind: BlobKind): IngestStream = NullIngestStream( diff --git a/src/blobsets/filestores.nim b/src/blobsets/filestores.nim index 97fd8b0..3a89957 100644 --- a/src/blobsets/filestores.nim +++ b/src/blobsets/filestores.nim @@ -51,9 +51,9 @@ proc getPosFs(s: BlobStream): BiggestInt = var s = FsBlobStream(s) (BiggestInt)s.file.getFilePos -proc fsBlobRead(s: BlobStream; buffer: pointer; len: Natural): int = +proc fsBlobRead(s: BlobStream; buffer: pointer; len: Natural): Future[int] = var s = FsBlobStream(s) - result = waitFor s.file.readBuffer(buffer, len) + s.file.readBuffer(buffer, len) proc fsOpenBlobStream(s: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream = var fs = FileStore(s) @@ -80,7 +80,7 @@ proc fsFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] = result.size = s.pos moveFile(s.path, s.path.parentDir / result.id.toHex) -proc fsIngest(s: IngestStream; buf: pointer; len: Natural) = +proc fsIngest(s: IngestStream; buf: pointer; len: Natural) {.async.} = var s = FsIngestStream(s) off = 0 @@ -100,7 +100,7 @@ proc fsIngest(s: IngestStream; buf: pointer; len: Natural) = else: n = min(n, blobLeafSize-leafOff) s.ctx.update(buf[off].addr, n) - waitFor s.file.writeBuffer(buf[off].addr, n) + await s.file.writeBuffer(buf[off].addr, n) off.inc n s.pos.inc n diff --git a/src/blobsets/httpservers.nim b/src/blobsets/httpservers.nim index 29324bb..2f2cafb 100644 --- a/src/blobsets/httpservers.nim +++ b/src/blobsets/httpservers.nim @@ -105,9 +105,9 @@ proc postIngest(hss: HttpStoreServer; req: Request): Future[void] = proc serve*(hss: HttpStoreServer; port = Port(80)): Future[void] = ## Serve requests to HTTP store. - proc handleRequest(req: Request): Future[void] = + proc handleRequest(req: Request) {.async.} = try: - case req.reqMethod + let fut = case req.reqMethod of HttpGET: get(hss, req) of HttpHEAD: @@ -118,10 +118,13 @@ proc serve*(hss: HttpStoreServer; port = Port(80)): Future[void] = postIngest(hss, req) else: req.respond(Http501, "method not implemented") + await fut except KeyError: - req.respond(Http404, "blob not found") + await req.respond(Http404, "blob not found") except ValueError: - req.respond(Http400, getCurrentExceptionMsg()) + await req.respond(Http400, getCurrentExceptionMsg()) except: - req.respond(Http500, getCurrentExceptionMsg()) + try: + await req.respond(Http500, getCurrentExceptionMsg()) + except: discard hss.server.serve(port, handleRequest) diff --git a/src/blobsets/httpstores.nim b/src/blobsets/httpstores.nim index 7db8a26..c44dff5 100644 --- a/src/blobsets/httpstores.nim +++ b/src/blobsets/httpstores.nim @@ -35,13 +35,13 @@ proc getPosHttp(s: BlobStream): BiggestInt = var s = (HttpBlobStream)s s.rangePos -proc httpBlobRead(s: BlobStream; buffer: pointer; len: Natural): int = +proc httpBlobRead(s: BlobStream; buffer: pointer; len: Natural): Future[int] {.async.} = assert(not buffer.isNil) var s = (HttpBlobStream)s let headers = newHttpHeaders({"range": "bytes=$1-$2" % [ $s.rangePos, $(s.rangePos+len) ]}) resp = waitFor s.client.request(s.url, HttpGET, headers=headers) - var body = waitFor resp.body + var body = await resp.body result = (int)min(body.len, len) if result > 0: copyMem(buffer, body[0].addr, result) @@ -75,13 +75,13 @@ proc httpFinish(s: IngestStream): tuple[id: BlobId, size: BiggestInt] = result.id = toBlobId resp.headers["blob-id"] result.size = parseBiggestInt resp.headers["blob-size"] -proc httpIngest(x: IngestStream; buf: pointer; len: Natural) = +proc httpIngest(x: IngestStream; buf: pointer; len: Natural) {.async.} = var s = HttpIngestStream(x) body = newString(len) copyMem(body[0].addr, buf, body.len) # TODO: zero-copy - let resp = waitFor s.client.request($s.url, HttpPUT, body) + let resp = await s.client.request($s.url, HttpPUT, body) if resp.code != Http204: raiseAssert(resp.status)