diff --git a/packages/car/package.json b/packages/car/package.json index ad71af5a1..9a08384e2 100644 --- a/packages/car/package.json +++ b/packages/car/package.json @@ -142,6 +142,7 @@ "@helia/interface": "^4.3.0", "@ipld/car": "^5.3.0", "@libp2p/interfaces": "^3.3.2", + "@libp2p/utils": "^5.4.6", "interface-blockstore": "^5.2.10", "it-drain": "^3.0.5", "it-map": "^3.0.5", @@ -151,10 +152,12 @@ "progress-events": "^1.0.0" }, "devDependencies": { + "@helia/mfs": "^3.0.6", "@helia/unixfs": "^3.0.6", "@ipld/dag-pb": "^4.1.0", "aegir": "^44.0.1", "blockstore-core": "^4.4.0", + "datastore-core": "^9.2.9", "ipfs-unixfs-importer": "^15.2.4", "it-to-buffer": "^4.0.5" }, diff --git a/packages/car/src/index.ts b/packages/car/src/index.ts index b97acff88..23b247be0 100644 --- a/packages/car/src/index.ts +++ b/packages/car/src/index.ts @@ -67,6 +67,7 @@ import type { DAGWalker } from '@helia/interface' import type { GetBlockProgressEvents, PutManyBlocksProgressEvents } from '@helia/interface/blocks' import type { CarReader } from '@ipld/car' import type { AbortOptions } from '@libp2p/interfaces' +import type { Filter } from '@libp2p/utils/filters' import type { Blockstore } from 'interface-blockstore' import type { CID } from 'multiformats/cid' import type { ProgressOptions } from 'progress-events' @@ -76,6 +77,13 @@ export interface CarComponents { dagWalkers: Record } +interface ExportCarOptions extends AbortOptions, ProgressOptions { + /** + * If a filter is passed it will be used to deduplicate blocks exported in the car file + */ + blockFilter?: Filter +} + /** * The Car interface provides operations for importing and exporting Car files * from Helia's underlying blockstore. @@ -129,7 +137,7 @@ export interface Car { * await eventPromise * ``` */ - export(root: CID | CID[], writer: Pick, options?: AbortOptions & ProgressOptions): Promise + export(root: CID | CID[], writer: Pick, options?: ExportCarOptions): Promise /** * Returns an AsyncGenerator that yields CAR file bytes. @@ -170,7 +178,7 @@ class DefaultCar implements Car { )) } - async export (root: CID | CID[], writer: Pick, options?: AbortOptions & ProgressOptions): Promise { + async export (root: CID | CID[], writer: Pick, options?: ExportCarOptions): Promise { const deferred = defer() const roots = Array.isArray(root) ? root : [root] @@ -189,6 +197,12 @@ class DefaultCar implements Car { for (const root of roots) { void queue.add(async () => { await this.#walkDag(root, queue, async (cid, bytes) => { + // if a filter has been passed, skip blocks that have already been written + if (options?.blockFilter?.has(cid.multihash.bytes) === true) { + return + } + + options?.blockFilter?.add(cid.multihash.bytes) await writer.put({ cid, bytes }) }, options) }) @@ -203,7 +217,7 @@ class DefaultCar implements Car { } } - async * stream (root: CID | CID[], options?: AbortOptions & ProgressOptions): AsyncGenerator { + async * stream (root: CID | CID[], options?: ExportCarOptions): AsyncGenerator { const { writer, out } = CarWriter.create(root) // has to be done async so we write to `writer` and read from `out` at the diff --git a/packages/car/test/index.spec.ts b/packages/car/test/index.spec.ts index 91642b881..fabf871f0 100644 --- a/packages/car/test/index.spec.ts +++ b/packages/car/test/index.spec.ts @@ -1,9 +1,12 @@ /* eslint-env mocha */ +import { mfs } from '@helia/mfs' import { type UnixFS, unixfs } from '@helia/unixfs' import { CarReader } from '@ipld/car' +import { createScalableCuckooFilter } from '@libp2p/utils/filters' import { expect } from 'aegir/chai' import { MemoryBlockstore } from 'blockstore-core' +import { MemoryDatastore } from 'datastore-core' import { fixedSize } from 'ipfs-unixfs-importer/chunker' import toBuffer from 'it-to-buffer' import { car, type Car } from '../src/index.js' @@ -115,4 +118,55 @@ describe('import/export car file', () => { expect(await toBuffer(u.cat(cid2))).to.equalBytes(fileData2) expect(await toBuffer(u.cat(cid3))).to.equalBytes(fileData3) }) + + it('exports a car file without duplicates', async () => { + const otherBlockstore = new MemoryBlockstore() + const otherUnixFS = unixfs({ blockstore: otherBlockstore }) + const otherDatastore = new MemoryDatastore() + const otherMFS = mfs({ blockstore: otherBlockstore, datastore: otherDatastore }) + const otherCar = car({ blockstore: otherBlockstore, dagWalkers }) + + await otherMFS.mkdir('/testDups') + await otherMFS.mkdir('/testDups/sub') + + const sourceCid = await otherUnixFS.addBytes(smallFile) + await otherMFS.cp(sourceCid, '/testDups/a.smallfile') + await otherMFS.cp(sourceCid, '/testDups/sub/b.smallfile') + + const rootObject = await otherMFS.stat('/testDups/') + const rootCid = rootObject.cid + + const writer = memoryCarWriter(rootCid) + const blockFilter = createScalableCuckooFilter(5) + await otherCar.export(rootCid, writer, { + blockFilter + }) + + const carBytes = await writer.bytes() + expect(carBytes.length).to.equal(349) + }) + + it('exports a car file with duplicates', async () => { + const otherBlockstore = new MemoryBlockstore() + const otherUnixFS = unixfs({ blockstore: otherBlockstore }) + const otherDatastore = new MemoryDatastore() + const otherMFS = mfs({ blockstore: otherBlockstore, datastore: otherDatastore }) + const otherCar = car({ blockstore: otherBlockstore, dagWalkers }) + + await otherMFS.mkdir('/testDups') + await otherMFS.mkdir('/testDups/sub') + + const sourceCid = await otherUnixFS.addBytes(smallFile) + await otherMFS.cp(sourceCid, '/testDups/a.smallfile') + await otherMFS.cp(sourceCid, '/testDups/sub/b.smallfile') + + const rootObject = await otherMFS.stat('/testDups/') + const rootCid = rootObject.cid + + const writer = memoryCarWriter(rootCid) + await otherCar.export(rootCid, writer) + + const carBytes = await writer.bytes() + expect(carBytes.length).to.equal(399) + }) })