77 */
88
99import { getSizeFromPieceCID } from '@filoz/synapse-core/piece'
10- import { METADATA_KEYS , type StorageContext , type Synapse , WarmStorageService } from '@filoz/synapse-sdk'
10+ import {
11+ type DataSetPieceData ,
12+ METADATA_KEYS ,
13+ PDPServer ,
14+ PDPVerifier ,
15+ type StorageContext ,
16+ type Synapse ,
17+ WarmStorageService ,
18+ } from '@filoz/synapse-sdk'
19+ import { reconcilePieceStatus } from '../piece/piece-status.js'
20+ import type { Warning } from '../utils/types.js'
1121import { isStorageContextWithDataSetId } from './type-guards.js'
12- import type {
13- DataSetPiecesResult ,
14- DataSetWarning ,
15- GetDataSetPiecesOptions ,
16- PieceInfo ,
17- StorageContextWithDataSetId ,
18- } from './types.js'
22+ import type { DataSetPiecesResult , GetDataSetPiecesOptions , PieceInfo , StorageContextWithDataSetId } from './types.js'
23+ import { PieceStatus } from './types.js'
1924
2025/**
2126 * Get all pieces for a dataset from a StorageContext
@@ -57,15 +62,58 @@ export async function getDataSetPieces(
5762 }
5863
5964 const pieces : PieceInfo [ ] = [ ]
60- const warnings : DataSetWarning [ ] = [ ]
65+ const warnings : Warning [ ] = [ ]
66+
67+ // call PDPVerifier.getScheduledRemovals to get the list of pieces that are scheduled for removal
68+ let scheduledRemovals : number [ ] = [ ]
69+ let pdpServerPieces : DataSetPieceData [ ] | null = null
70+ try {
71+ const warmStorage = await WarmStorageService . create ( synapse . getProvider ( ) , synapse . getWarmStorageAddress ( ) )
72+ const pdpVerifier = new PDPVerifier ( synapse . getProvider ( ) , warmStorage . getPDPVerifierAddress ( ) )
73+ scheduledRemovals = await pdpVerifier . getScheduledRemovals ( storageContext . dataSetId )
74+ try {
75+ const providerInfo = await synapse . getProviderInfo ( storageContext . provider . serviceProvider )
76+ const pdpServer = new PDPServer ( null , providerInfo . products ?. PDP ?. data ?. serviceURL ?? '' )
77+ const dataSet = await pdpServer . getDataSet ( storageContext . dataSetId )
78+ pdpServerPieces = dataSet . pieces
79+ } catch ( error ) {
80+ logger ?. warn ( { error } , 'Failed to fetch provider data for scheduled removals and orphan detection' )
81+ warnings . push ( {
82+ code : 'PROVIDER_DATA_UNAVAILABLE' ,
83+ message : 'Failed to fetch provider data; orphan detection disabled' ,
84+ context : { dataSetId : storageContext . dataSetId , error : String ( error ) } ,
85+ } )
86+ }
87+ } catch ( error ) {
88+ logger ?. warn ( { error } , 'Failed to get scheduled removals' )
89+ warnings . push ( {
90+ code : 'SCHEDULED_REMOVALS_UNAVAILABLE' ,
91+ message : 'Failed to get scheduled removals' ,
92+ context : { dataSetId : storageContext . dataSetId , error : String ( error ) } ,
93+ } )
94+ }
6195
6296 // Use the async generator to fetch all pieces
6397 try {
6498 const getPiecesOptions = { ...( signal && { signal } ) }
99+ const providerPiecesById = pdpServerPieces ? new Map ( pdpServerPieces . map ( ( piece ) => [ piece . pieceId , piece ] ) ) : null
65100 for await ( const piece of storageContext . getPieces ( getPiecesOptions ) ) {
66101 const pieceId = piece . pieceId
67102 const pieceCid = piece . pieceCid
68- const pieceInfo : PieceInfo = { pieceId, pieceCid : pieceCid . toString ( ) }
103+ const { status, warning } = reconcilePieceStatus ( {
104+ pieceId,
105+ pieceCid,
106+ scheduledRemovals,
107+ providerPiecesById,
108+ } )
109+ const pieceInfo : PieceInfo = {
110+ pieceId,
111+ pieceCid : pieceCid . toString ( ) ,
112+ status,
113+ }
114+ if ( warning ) {
115+ warnings . push ( warning )
116+ }
69117
70118 // Calculate piece size from CID
71119 try {
@@ -79,6 +127,24 @@ export async function getDataSetPieces(
79127
80128 pieces . push ( pieceInfo )
81129 }
130+ if ( providerPiecesById !== null ) {
131+ // reconcilePieceStatus removes provider matches as we stream on-chain pieces.
132+ // Remaining entries are only reported by the provider, which are off-chain orphans.
133+ for ( const piece of providerPiecesById . values ( ) ) {
134+ // add the rest of the pieces to the pieces list
135+ pieces . push ( {
136+ pieceId : piece . pieceId ,
137+ pieceCid : piece . pieceCid . toString ( ) ,
138+ status : PieceStatus . OFFCHAIN_ORPHANED ,
139+ } )
140+ warnings . push ( {
141+ code : 'OFFCHAIN_ORPHANED' ,
142+ message : 'Piece is reported by provider but not on-chain' ,
143+ context : { pieceId : piece . pieceId , pieceCid : piece . pieceCid . toString ( ) } ,
144+ } )
145+ }
146+ }
147+ pieces . sort ( ( a , b ) => a . pieceId - b . pieceId )
82148 } catch ( error ) {
83149 if ( error instanceof Error && error . name === 'AbortError' ) {
84150 throw error
@@ -122,7 +188,7 @@ async function enrichPiecesWithMetadata(
122188 synapse : Synapse ,
123189 storageContext : StorageContextWithDataSetId ,
124190 pieces : PieceInfo [ ] ,
125- warnings : DataSetWarning [ ] ,
191+ warnings : Warning [ ] ,
126192 logger ?: GetDataSetPiecesOptions [ 'logger' ]
127193) : Promise < void > {
128194 const dataSetId = storageContext . dataSetId
0 commit comments