blobsets/src/blobsets/httpstores.nim

142 lines
4.2 KiB
Nim

import ../blobsets
import tiger
import std/asyncdispatch, std/httpclient, std/strutils, std/uri
type
HttpBlobStream = ref HttpBlobStreamObj
HttpBlobStreamObj = object of BlobStreamObj
client: AsyncHttpClient
url: string
rangePos: BiggestInt
HttpIngestStream = ref HttpIngestStreamObj
HttpIngestStreamObj = object of IngestStreamObj
client: AsyncHttpClient
url: string
ctx: TigerState
leaves: seq[BlobId]
leaf: string
buffOff: int
pos, nodeOffset: BiggestInt
HttpStore* = ref HttpStoreObj
HttpStoreObj = object of BlobStoreObj
client: AsyncHttpClient
# TODO: keep a future that completes after the current client transaction completes,
# chain new streams that use the same client connection to this future
url: Uri
proc httpBlobClose(s: BlobStream) = discard
proc httpBlobSize(s: BlobStream): BiggestInt =
var s = HttpBlobStream(s)
discard
proc setPosHttp(s: BlobStream; pos: BiggestInt) =
var s = (HttpBlobStream)s
s.rangePos = pos
proc getPosHttp(s: BlobStream): BiggestInt =
var s = (HttpBlobStream)s
s.rangePos
proc httpBlobRead(s: BlobStream; buffer: pointer; len: Natural): Future[int] {.async.} =
assert(not buffer.isNil)
var s = (HttpBlobStream)s
let
headers = newHttpHeaders({"range": "bytes=$1-$2" % [ $s.rangePos, $(s.rangePos+len) ]})
resp = waitFor s.client.request(s.url, HttpGET, headers=headers)
var body = await resp.body
result = (int)min(body.len, len)
if result > 0:
copyMem(buffer, body[0].addr, result)
s.rangePos.inc result
proc httpOpenBlobStream(store: BlobStore; id: BlobId; size: BiggestInt; kind: BlobKind): BlobStream =
var store = HttpStore(store)
let stream = HttpBlobStream(
client: store.client,
closeImpl: httpBlobClose,
sizeImpl: httpBlobSize,
setPosImpl: setPosHttp,
getPosImpl: getPosHttp,
readImpl: httpBlobRead,
url: $((store.url / $kind) / id.toHex)
)
let resp = waitFor stream.client.request(stream.url, HttpHEAD)
case resp.code
of Http200, Http204:
stream
of Http404:
raise newException(KeyError, "blob not at HTTP store")
else:
raise newException(KeyError,
"unrecognized " & $resp.code & " response from HTTP store: " & resp.status)
proc httpFinish(s: IngestStream): Future[tuple[id: BlobId, size: BiggestInt]] {.async.} =
var
s = HttpIngestStream(s)
pair: tuple[id: BlobId, size: BiggestInt]
let resp = await s.client.request($s.url, HttpPOST)
if not resp.code.is2xx:
raiseAssert(resp.status)
pair.id = toBlobId resp.headers["blob-id"]
pair.size = parseBiggestInt resp.headers["blob-size"]
return pair
proc httpIngest(x: IngestStream; buf: pointer; len: Natural) {.async.} =
var
s = HttpIngestStream(x)
body = newString(len)
copyMem(body[0].addr, buf, body.len)
# TODO: zero-copy
let resp = await s.client.request($s.url, HttpPUT, body)
if resp.code != Http204:
raiseAssert(resp.status)
proc httpOpenIngestStream(s: BlobStore; size: BiggestInt; kind: BlobKind): IngestStream =
var store = HttpStore(s)
let
client = store.client
url = (store.url / "ingest") / $kind
headers = newHttpHeaders({"ingest-size": $size})
resp = waitFor client.request($url, HttpPOST, headers=headers)
if resp.code == Http201:
var ingestUrl = parseUri resp.headers["Location"]
if not ingestUrl.isAbsolute:
ingestUrl = combine(store.url, ingestUrl)
result = HttpIngestStream(
finishImpl: httpFinish,
ingestImpl: httpIngest,
client: client,
url: $ingestUrl,
leaf: newString(min(size.int, blobLeafSize)),
)
else:
raiseAssert(resp.status)
proc httpCloseStore(s: BlobStore) =
var s = HttpStore(s)
close s.client
proc httpContains(s: BlobStore; id: BlobId; kind: BlobKind): Future[bool] {.async.} =
var s = HttpStore(s)
try:
let
url = $((s.url / $kind) / id.toHex)
resp = await s.client.request(url, HttpHEAD)
return (resp.code in {Http200, Http204})
except:
close s.client
return false
proc newHttpStore*(url: string): HttpStore =
HttpStore(
closeImpl: httpCloseStore,
containsImpl: httpContains,
openBlobStreamImpl: httpOpenBlobStream,
openIngestStreamImpl: httpOpenIngestStream,
client: newAsyncHttpClient(),
url: parseUri url,
)