Skip to content

Commit

Permalink
feat: direct fetch for HTTP retrievals
Browse files Browse the repository at this point in the history
When the retrieving the content using the Trustless HTTP Gateway
protocol ("http"), fetch the content directly from the provider,
do not use Lassie.

This should give us better visibility into various error statuses
returned by providers, e.g. 429 Too Many Requests, which Lassie converts
to generic 502 Bad Gateway error.

List of synthetic status codes corresponding to different errors we may
encounter along the new codepath:

- 900 - unknown error (fallback)
- 901 - provider's multiaddr is not "tcp"
- 902 - provider's multiaddr is not "https"
- 903 - provider's multiaddr has too many parts
- 911 - provider's hostname cannot be resolved via DNS
- 912 - TCP connection error
- 921 - CID uses an unsupported hash algorithm
- 922 - payload's hash does not match the CID
- 923 - provider returned unexpected blocks in the CAR response

Signed-off-by: Miroslav Bajtoš <[email protected]>
  • Loading branch information
bajtos committed Jul 9, 2024
1 parent 94b95e4 commit 8d0fe85
Show file tree
Hide file tree
Showing 7 changed files with 8,346 additions and 16 deletions.
8 changes: 8 additions & 0 deletions deps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,11 @@
export { encodeHex } from 'https://deno.land/[email protected]/encoding/hex.ts'
export { decodeBase64 } from 'https://deno.land/[email protected]/encoding/base64.ts'
export { decode as decodeVarint } from 'https://deno.land/x/[email protected]/varint.ts'

// Deno Bundle does not support npm dependencies, we have to load the via CDN
export { CarBlockIterator } from 'https://cdn.skypack.dev/@ipld/[email protected]/?dts'
export {
UnsupportedHashError,
HashMismatchError,
validateBlock
} from 'https://cdn.skypack.dev/@web3-storage/[email protected]/?dts'
48 changes: 48 additions & 0 deletions lib/multiaddr.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/**
* @param {string} addr Multiaddr, e.g. `/ip4/127.0.0.1/tcp/80/http`
* @returns {string} Parsed URI, e.g. `http://127.0.0.1:80`
*/
export function multiaddrToHttpUri (addr) {
const [, hostType, hostValue, ipProtocol, port, scheme, ...rest] = addr.split('/')

if (ipProtocol !== 'tcp') {
throw Object.assign(
new Error(`Cannot parse "${addr}": unsupported protocol "${ipProtocol}"`),
{ code: 'UNSUPPORTED_MULTIADDR_PROTO' }
)
}

if (scheme !== 'http' && scheme !== 'https') {
throw Object.assign(
new Error(`Cannot parse "${addr}": unsupported scheme "${scheme}"`),
{ code: 'UNSUPPORTED_MULTIADDR_SCHEME' }
)
}

if (rest.length) {
throw Object.assign(
new Error(`Cannot parse "${addr}": too many parts`),
{ code: 'MULTIADDR_HAS_TOO_MANY_PARTS' }
)
}

return `${scheme}://${getUriHost(hostType, hostValue)}${buildUriPort(scheme, port)}`
}

function getUriHost (hostType, hostValue) {
switch (hostType) {
case 'ip4':
case 'dns':
case 'dns4':
case 'dns6':
return hostValue
case 'ip6':
return `[${hostValue}]`
}
}

function buildUriPort (scheme, port) {
if (scheme === 'http' && port === '80') return ''
if (scheme === 'https' && port === '443') return ''
return `:${port}`
}
107 changes: 93 additions & 14 deletions lib/spark.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,14 @@ import { ActivityState } from './activity-state.js'
import { SPARK_VERSION, MAX_CAR_SIZE, APPROX_ROUND_LENGTH_IN_MS } from './constants.js'
import { queryTheIndex } from './ipni-client.js'
import { getMinerPeerId as defaultGetMinerPeerId } from './miner-info.js'
import { multiaddrToHttpUri } from './multiaddr.js'

import {
encodeHex
CarBlockIterator,
encodeHex,
HashMismatchError,
UnsupportedHashError,
validateBlock
} from '../vendor/deno-deps.js'

const sleep = dt => new Promise(resolve => setTimeout(resolve, dt))
Expand Down Expand Up @@ -77,25 +83,15 @@ export default class Spark {
stats.protocol = provider.protocol
stats.providerAddress = provider.address

const searchParams = new URLSearchParams({
// See https://github.com/filecoin-project/lassie/blob/main/docs/HTTP_SPEC.md#dag-scope-request-query-parameter
// Only the root block at the end of the path is returned after blocks required to verify the specified path segments.
'dag-scope': 'block',
protocols: provider.protocol,
providers: provider.address
})
const url = `ipfs://${retrieval.cid}?${searchParams.toString()}`
try {
await this.fetchCAR(url, stats)
await this.fetchCAR(provider.protocol, provider.address, retrieval.cid, stats)
} catch (err) {
console.error(`Failed to fetch ${url}`)
console.error(`Failed to fetch ${retrieval.cid} from ${provider.address} using ${provider.protocol}`)
console.error(err)
}
}

async fetchCAR (url, stats) {
console.log(`Fetching: ${url}`)

async fetchCAR (protocol, address, cid, stats) {
// Abort if no progress was made for 60 seconds
const controller = new AbortController()
const { signal } = controller
Expand All @@ -116,6 +112,9 @@ export default class Spark {
const carBytes = new Uint8Array(carBuffer)

try {
const url = getRetrievalUrl(protocol, address, cid)
console.log(`Fetching: ${url}`)

resetTimeout()
const res = await this.#fetch(url, { signal })
stats.statusCode = res.status
Expand Down Expand Up @@ -146,6 +145,14 @@ export default class Spark {
}

if (!stats.carTooLarge) {
try {
await verifyContent(cid, carBytes)
stats.contentVerification = 'OK'
} catch (err) {
console.error('Content verification failed.', err)
stats.contentVerification = 'ERROR_' + (err.code ?? 'UNKNOWN')
}

const digest = await crypto.subtle.digest('sha-256', carBytes)
// 12 is the code for sha2-256
// 20 is the digest length (32 bytes = 256 bits)
Expand All @@ -155,6 +162,11 @@ export default class Spark {
console.error('Retrieval failed with status code %s: %s',
res.status, (await res.text()).trimEnd())
}
} catch (err) {
if (!stats.statusCode) {
stats.statusCode = mapErrorToStatusCode(err)
}
throw err
} finally {
clearTimeout(timeout)
}
Expand Down Expand Up @@ -240,6 +252,73 @@ export default class Spark {
}
}

function getRetrievalUrl (protocol, address, cid) {
if (protocol === 'http') {
const baseUrl = multiaddrToHttpUri(address)
return `${baseUrl}/ipfs/${cid}?dag-scope=block`
}

const searchParams = new URLSearchParams({
// See https://github.com/filecoin-project/lassie/blob/main/docs/HTTP_SPEC.md#dag-scope-request-query-parameter
// Only the root block at the end of the path is returned after blocks required to verify the specified path segments.
'dag-scope': 'block',
protocols: protocol,
providers: address
})
return `ipfs://${cid}?${searchParams.toString()}`
}

/**
* @param {string} cid
* @param {Uint8Array} carBytes
*/
async function verifyContent (cid, carBytes) {
const reader = await CarBlockIterator.fromBytes(carBytes)
for await (const block of reader) {
if (block.cid.toString() !== cid.toString()) {
throw Object.assign(
new Error(`Unexpected block CID ${block.cid}. Expected: ${cid}`),
{ code: 'UNEXPECTED_CAR_BLOCK' }
)
}

await validateBlock(block)
}
}

function mapErrorToStatusCode (err) {
// 90x codes for multiaddr parsing errors
switch (err.code) {
case 'UNSUPPORTED_MULTIADDR_PROTO':
return 901
case 'UNSUPPORTED_MULTIADDR_SCHEME':
return 902
case 'MULTIADDR_HAS_TOO_MANY_PARTS':
return 903
}

// 92x for content verification errors
if (err instanceof UnsupportedHashError) {
return 921
} else if (err instanceof HashMismatchError) {
return 922
} else if (err.code === 'UNEXPECTED_CAR_BLOCK') {
return 923
}

// 91x errors for network connection errors
// Unfortunately, the Fetch API does not support programmatic detection of various error
// conditions. We have to check the error message text.
if (err.message.includes('dns error')) {
return 911
} else if (err.message.includes('tcp connect error')) {
return 912
}

// Fallback code for unknown errors
return 900
}

async function assertOkResponse (res, errorMsg) {
if (res.ok) return

Expand Down
2 changes: 2 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import './test/ipni-client.test.js'
import './test/miner-info.test.js'
import './test/multiaddr.test.js'

import './test/integration.js'
import './test/spark.js'
33 changes: 33 additions & 0 deletions test/multiaddr.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { test } from 'zinnia:test'
import { assertEquals, assertThrows } from 'zinnia:assert'
import { multiaddrToHttpUri } from '../lib/multiaddr.js'

const HAPPY_CASES = [
['/ip4/127.0.0.1/tcp/80/http', 'http://127.0.0.1'],
['/ip4/127.0.0.1/tcp/8080/http', 'http://127.0.0.1:8080'],
['/ip4/127.0.0.1/tcp/443/https', 'https://127.0.0.1'],
['/ip4/127.0.0.1/tcp/8080/https', 'https://127.0.0.1:8080'],
['/dns/meridian.space/tcp/8080/http', 'http://meridian.space:8080'],
['/dns4/meridian.space/tcp/8080/http', 'http://meridian.space:8080'],
['/dns6/meridian.space/tcp/8080/http', 'http://meridian.space:8080']
]

for (const [multiaddr, expectedUri] of HAPPY_CASES) {
test(`parse ${multiaddr}`, () => {
const uri = multiaddrToHttpUri(multiaddr)
assertEquals(uri, expectedUri)
})
}

const ERROR_CASES = [
['/ip4/127.0.0.1/tcp/80', 'Cannot parse "/ip4/127.0.0.1/tcp/80": unsupported scheme "undefined"'],
['/ip4/127.0.0.1/udp/90', 'Cannot parse "/ip4/127.0.0.1/udp/90": unsupported protocol "udp"'],
['/ip4/127.0.0.1/tcp/8080/http/p2p/pubkey', 'Cannot parse "/ip4/127.0.0.1/tcp/8080/http/p2p/pubkey": too many parts']
]

for (const [multiaddr, expectedError] of ERROR_CASES) {
test(`parse ${multiaddr}`, () => {
const err = assertThrows(() => multiaddrToHttpUri(multiaddr))
assertEquals(err.message, expectedError)
})
}
4 changes: 2 additions & 2 deletions test/spark.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ test('fetchCAR', async () => {
carChecksum: null,
statusCode: null
}
await spark.fetchCAR(URL, stats)
await spark.fetchCAR('http', '127.0.0.1', 'bafy', stats)
assertEquals(stats.timeout, false)
assertInstanceOf(stats.startAt, Date)
assertInstanceOf(stats.firstByteAt, Date)
Expand Down Expand Up @@ -104,7 +104,7 @@ test('fetchCAR exceeding MAX_CAR_SIZE', async () => {
carChecksum: null,
statusCode: null
}
await spark.fetchCAR(URL, stats)
await spark.fetchCAR('http', '127.0.0.1', 'bafy', stats)
assertEquals(stats.timeout, false)
assertEquals(stats.carTooLarge, true)
assertEquals(stats.byteLength, MAX_CAR_SIZE + 1)
Expand Down
Loading

0 comments on commit 8d0fe85

Please sign in to comment.