接上篇,当Client 、 /fil/storage/mk/1.0.1 storage/mk/1.0.1 プロトコルのストリームを作成し、そのストリームを通じてストレージ トランザクションを送信します。このプロトコルを処理するのはHandleDealStream メソッドです。このメソッドは、処理のために独自のreceiveDeal メソッドを直接呼び出します。 receiveDeal メソッドはこれを次のように処理します。ストリームから保存されたProposal オブジェクトを読み取ります。 提案、エラー:= s.ReadDealProposal() ここでのストリーム オブジェクトは、元のストリーム オブジェクトをカプセル化するdealStream オブジェクト (storagemarket/network/deal_stream.go) です。 ipld ノード オブジェクトを取得します。 提案Nd、エラー:= cborutil.AsIpld(proposal.DealProposal) マイナートランザクションオブジェクトを生成します。 取引:= &storagemarket.MinerDeal{
クライアント: s.RemotePeer(),
マイナー: p.net.ID(),
クライアント取引提案: *proposal.DealProposal、
提案Cid: proposalNd.Cid(),
状態: storagemarket.StorageDealUnknown、
参照: 提案書、
} fsm 状態グループのBegin メソッドを呼び出して、状態マシンを生成し、マイナー トランザクション オブジェクトの追跡を開始します。 エラー = p.deals.Begin(proposalNd.Cid(), 取引) ストリーム オブジェクトを接続マネージャーに保存します。 エラー = p.conns.AddStream(proposalNd.Cid(), s) トランザクション オブジェクトの処理を開始するために、fsm 状態グループにイベントを送信します。 p.deals.Send(proposalNd.Cid(), storagemarket.ProviderEventOpen) を返します。 プロセッサがProviderEventOpen 状態イベントを受信すると、初期状態がデフォルト値 0、つまりStorageDealUnknown であるため、イベント プロセッサ オブジェクトは内部処理を通じて対応する宛先状態をStorageDealValidating として検出し、処理関数ValidateDealProposal 関数を呼び出して処理します。
1. `ValidateDealProposal`関数この関数は、トランザクション提案オブジェクトを検証するために使用されます。 Lotus Provider アダプター オブジェクトのGetChainHead メソッドを呼び出して、ブロックチェーンの最上部のチップセット キーと高さを取得します。 tok、height、err := environment.Node().GetChainHead(ctx.Context()) の場合、err != nil {
ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("最新の状態 ID の取得中にノードエラーが発生しました: %w", err)) を返します
} クライアントから送信されたトランザクション提案オブジェクトを検証します。検証が失敗した場合は、拒否イベントが送信されます。 エラーの場合:= providerutils.VerifyProposal(ctx.Context(), deal.ClientDealProposal, tok, environment.Node().VerifySignature);エラー != ゼロ {
ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("StorageDealProposal を検証中: %w", err)) を返します
} トランザクション提案で指定されたマイナーアドレスが正しいことを確認します。正しくない場合は拒否イベントが送信されます。 提案:= deal.Proposalif 提案.Provider != environment.Address() {
ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("取引のプロバイダーが正しくありません")) を返します
} トランザクションで指定された高さが正しいことを確認します。正しくない場合は拒否イベントが送信されます。 高さ > proposal.StartEpoch-environment.DealAcceptanceBuffer() {の場合
ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("取引開始エポックが早すぎるか、取引がすでに期限切れです")) を返します
} 請求が正常かどうかを確認し、正常でない場合は拒否イベントを送信します。 最小価格:= big.Div(big.Mul(environment.Ask().Price, abi.NewTokenAmount(int64(proposal.PieceSize))), abi.NewTokenAmount(1<<30))
提案のStoragePricePerEpoch.LessThan(minPrice)の場合{
ctx.Trigger(storagemarket.ProviderEventDealRejected, を返します。
xerrors.Errorf("エポックあたりのストレージ価格が提示価格より低い: %s < %s", proposal.StoragePricePerEpoch, minPrice))
} トランザクションのサイズが一致していることを確認します。一致しない場合は、拒否イベントが送信されます。 提案.PieceSize < environment.Ask().MinPieceSize {の場合
ctx.Trigger(storagemarket.ProviderEventDealRejected, を返します。
xerrors.Errorf("ピースのサイズが最小必要サイズより小さい: %d < %d", proposal.PieceSize, environment.Ask().MinPieceSize))
}proposal.PieceSize > environment.Ask().MaxPieceSize の場合 {
ctx.Trigger(storagemarket.ProviderEventDealRejected, を返します。
xerrors.Errorf("ピース サイズが最大許容サイズを超えています: %d > %d", proposal.PieceSize, environment.Ask().MaxPieceSize))
} 顧客の資金へのアクセス。 clientMarketBalance、エラー:= environment.Node().GetBalance(ctx.Context(), proposal.Client, tok)
err != nil の場合 {
ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("クライアント市場残高の取得に失敗しました: %w", err)) を返します
} クライアントの利用可能な資金が合計取引手数料より少ない場合、拒否イベントが送信されます。 clientMarketBalance.Available.LessThan(proposal.TotalStorageFee()) の場合 {
ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.New("clientMarketBalance.Available が小さすぎます") を返します)
} トランザクションが検証された場合、それは検証されます。 fsm コンテキスト オブジェクトのTrigger メソッドはイベントを送信します。 ctx.Trigger(storagemarket.ProviderEventDealDeciding) を返します。 ステート マシンがこのイベントを受信すると、イベント ハンドラーは状態をStorageDealUnknown からStorageDealAcceptWait に変更し、処理関数DecideOnProposal を呼び出してトランザクションを受け入れるかどうかを決定します。
2. 「DecideOnProposal」関数この関数は、トランザクションを承認するか拒否するかを決定するために使用されます。 環境オブジェクトのRunCustomDecisionLogic メソッドを呼び出してカスタム ロジックを実行し、顧客トランザクションが受け入れられていないことを確認します。 accept、reason、err := environment.RunCustomDecisionLogic(ctx.Context()、deal)、err != nil の場合 {
ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("カスタム取引決定ロジックが失敗しました: %w", err)) を返します
} 受け入れられない場合は、拒否イベントが送信されます。 受け入れる場合
ctx.Trigger(storagemarket.ProviderEventDealRejected, fmt.Errorf(reason)) を返します。
} 環境オブジェクトのSendSignedResponse メソッドを呼び出して、署名された応答をクライアントに送信します。 エラー = environment.SendSignedResponse(ctx.Context(), &network.Response{
状態: storagemarket.StorageDealWaitingForData、
提案: deal.ProposalCid、
})エラー!= nilの場合{
ctx.Trigger(storagemarket.ProviderEventSendResponseFailed, err) を返します
} このメソッドは、対応するストリームを見つけ、応答に署名し、署名された応答オブジェクトを生成し、最後にストリームを通じて応答を送信します。 クライアントを切断します。 エラーの場合:= environment.Disconnect(deal.ProposalCid);エラー != ゼロ {
log.Warnf("クライアント接続を閉じています: %+v", err)
} イベントを送信するには、fsm コンテキスト オブジェクトのTrigger メソッドを呼び出します。 ctx.Trigger(storagemarket.ProviderEventDataRequested) を返します ステート マシンがこのイベントを受信すると、イベント ハンドラーは状態をStorageDealAcceptWait からStorageDealWaitingForData に変更します。指定された処理関数がないため、処理のために関数が呼び出されることはなく、データ送信プロセスがイベントを送信するのを待ち続けます。 データ転送が開始されると、データ転送コンポーネントはProviderEventDataTransferInitiated イベントを送信します。イベント ハンドラーは状態をStorageDealWaitingForData からStorageDealTransferring に変更します。指定された処理関数がないため、処理のために関数が呼び出されず、プロセスはデータ転送プロセスがイベントを送信するのを待機し続けます。 データ転送が完了すると、データ転送コンポーネントはProviderEventDataTransferCompleted イベントを送信し、イベント ハンドラーは状態をStorageDealTransferring からStorageDealVerifyData に変更し、処理関数VerifyData を呼び出してデータを検証します。
3. `VerifyData`関数この関数は、受信したデータがトランザクション提案の pieceCID と一致することを確認します。 VerifyData 関数のフローは次のとおりです。 環境オブジェクトのGeneratePieceCommitmentToFile メソッドを呼び出して、フラグメントの CID、フラグメントのディレクトリ、およびメタデータ ディレクトリを生成します。 pieceCid、piecePath、metadataPath、err := environment.GeneratePieceCommitmentToFile(deal.Ref.Root、shared.AllSelector()) GeneratePieceCommitmentToFile メソッドは次のとおりです。 ファイル ストレージ オブジェクトのCreateTemp メソッドを呼び出して、一時ファイルを作成します。 f, エラー:= pio.store.CreateTemp() クリーンアップ関数を生成します。 クリーンアップ:=関数(){
f.閉じる()
_ = pio.store.Delete(f.Path())
} 指定された CID の内容を基になるストレージ オブジェクトから取得し、指定されたファイルに書き込みます。 エラー = pio.carIO.WriteCar(context.Background(), pio.bs, payloadCid, セレクター, f, userOnNewCarBlocks...) ファイル サイズ、つまりフラグメント サイズを取得します。 ピースサイズ:= uint64(f.Size()) ファイルの先頭に移動します。 _, エラー = f.Seek(0, io.SeekStart) ファイルの内容を使用してシャード ID を生成します。 コミットメント、paddedSize、err := GeneratePieceCommitment(rt、f、pieceSize) ファイルを閉じます。 _ = f.Close() フラグメント CID とファイル パスを返します。 コミットメント、f.Path()、paddedSize、nil を返す
マイナーがuniversalRetrievalEnabled フラグを設定すると、 GeneratePieceCommitmentWithMetadata 関数が処理のために直接呼び出されます。 ppuniversalRetrievalEnabledの場合{
providerutils.GeneratePieceCommitmentWithMetadata(ppfs、pppio.GeneratePieceCommitmentToFile、ppproofType、payloadCid、セレクタ) を返します。
} universalRetrievalEnabled フラグが true の場合、ストレージ マイナーはシャード内のすべての CID を追跡するため、ルート CID だけでなくすべての CID を取得できます。 それ以外の場合は、処理のためにピース IO オブジェクトのGeneratePieceCommitmentToFile メソッドを呼び出します。 pieceCid、piecePath、_、err := pppio.GeneratePieceCommitmentToFile(ppproofType、payloadCid、セレクタ) payloadCid ルート CID を示します。 pieceIO オブジェクトのGeneratePieceCommitmentToFile メソッドは、これを次のように処理します。 フラグメント CID とフラグメント パスを返します。 pieceCid、piecePath、filestore.Path("")、err を返します
生成されたシャード CID が、マイナー トランザクション内のトランザクション提案のシャード CID と一致していることを確認します。一致しない場合は拒否イベントが送信されます。 pieceCid != deal.Proposal.PieceCID {の場合
ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("提案 CommP が計算された CommP と一致しません") を返します)
}
3. fsm コンテキスト オブジェクトのTrigger メソッドを呼び出してイベントを送信します。 return ctx.Trigger(storagemarket.ProviderEventVerifiedData, piecePath, metadataPath) ステート マシンがこのイベントを受信すると、イベント ハンドラーは状態を `StorageDealVerifyData` から `StorageDealEnsureProviderFunds` に変更し、その処理関数 `EnsureProviderFunds` を呼び出して、トランザクションを受け入れるかどうかを決定します。同時に、処理関数を呼び出す前に、`Action` 関数を通じて、マイナー トランザクション オブジェクトの 2 つのプロパティ `PiecePath` と `MetadataPath` を変更します。 4. `EnsureProviderFunds`関数この関数は、マイナーが現在のトランザクションを処理するのに十分な資金を持っているかどうかを判断するために使用されます。 Lotus プロバイダー アダプターを取得します。 ノード:=環境.Node() ブロックチェーンの上部にあるチップセットに対応するキーと高さを取得します。 tok, _, err := node.GetChainHead(ctx.Context()) の場合 err != nil {
ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("チェーン ヘッドを取得しています: %w", err)) を返します
} マイナーのワーカーアドレスを取得します。 waddr、err := node.GetMinerWorkerAddress(ctx.Context()、deal.Proposal.Provider、tok) の場合、err != nil {
ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("マイナーワーカーを検索しています: %w", err)) を返します
} Lotus プロバイダー アダプターのEnsureFunds メソッドを呼び出して、マイナーに現在のトランザクションを処理するのに十分な資金があることを確認します。 mcid、err := node.EnsureFunds(ctx.Context()、deal.Proposal.Provider、waddr、deal.Proposal.ProviderCollateral、tok) の場合、err != nil {
ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("資金の確保: %w", err)) を返します
} 返されたmcid が空の場合、リアルタイムで確認されたことを意味し、fsm コンテキスト オブジェクトのTrigger メソッドが呼び出され、イベントが送信されます。 mcid == cid.Undefの場合{
ctx.Trigger(storagemarket.ProviderEventFunded) を返します
} それ以外の場合は、fsm コンテキスト オブジェクトのTrigger メソッドを呼び出して別のイベントを送信します。 ctx.Trigger(storagemarket.ProviderEventFundingInitiated, mcid) を返します。 ステート マシンがこのイベントを受信すると、イベント ハンドラーは状態をStorageDealEnsureProviderFunds からStorageDealProviderFunding に変更し、処理関数WaitForFunding を呼び出して、次のメッセージがチェーンにアップロードされるのを待機します。同時に、処理関数を呼び出す前に、 Action 関数を通じてマイナー トランザクション オブジェクトのPublishCid プロパティを変更します。
5. `WaitForFunding`関数この関数は、メッセージがアップロードされるのを待機するために使用されます。メッセージがチェーンにアップロードされた後、fsm コンテキスト オブジェクトのTrigger メソッドが呼び出され、イベントが送信されます。 機能内容は以下のとおりです。 node := environment.Node()return node.WaitForMessage(ctx.Context(), *deal.AddFundsCid, func(code exitcode.ExitCode, bytes []byte, err error) error {
err != nil の場合 {
ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds エラーが発生しました: %w", err)) を返します
}
コード!=終了コード.Okの場合{
ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds 終了コード: %s", code.String())) を返します
}
ctx.Trigger(storagemarket.ProviderEventFunded) を返します
}) ステート マシンがProviderEventFunded イベントを受信すると、イベント ハンドラーは状態をStorageDealProviderFunding からStorageDealPublish に変更し、その処理関数PublishDeal 呼び出してトランザクション情報をチェーンにアップロードします。同時に、処理関数を呼び出す前に、 Action 関数を通じてマイナー トランザクション オブジェクトのPublishCid プロパティを変更します。 6. `PublishDeal`関数この機能は主にトランザクション情報をチェーンに送信するために使用されます。 マイナートランザクションオブジェクトを生成します。 smDeal := storagemarket.MinerDeal{
クライアント: deal.Client、
クライアント取引提案: deal.ClientDealProposal、
提案Cid: deal.ProposalCid、
状態: deal.State、
参照: deal.Ref、
} Lotus プロバイダー アダプター オブジェクトのPublishDeals 呼び出して、トランザクション情報をチェーンにアップロードします。 mcid、エラー:= environment.Node().PublishDeals(ctx.Context(), smDeal)
err != nil の場合 {
ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("公開取引: %w", err)) を返します
} イベントを送信するには、fsm コンテキスト オブジェクトのTrigger メソッドを呼び出します。 ctx.Trigger(storagemarket.ProviderEventDealPublishInitiated, mcid) を返します。 ステート マシンがこのイベントを受信すると、イベント プロセッサは状態をStorageDealPublish からStorageDealPublishing に変更し、処理関数WaitForPublish を呼び出して、トランザクション情報がチェーンにアップロードされるのを待機します。
7. `WaitForPublish` 関数この関数は、トランザクション情報がチェーンにアップロードされるのを待機し、クライアントに応答を送信し、クライアントから切断するために使用されます。最後に、fsm コンテキスト オブジェクトのTrigger メソッドが呼び出され、イベント処理を通じてイベント オブジェクトが生成され、イベント オブジェクトがステート マシンに送信されます。ここで生成されるイベント オブジェクト名はProviderEventDealPublished です。 ステート マシンがこのイベントを受信すると、イベント ハンドラーは状態をStorageDealPublishing からStorageDealStaged に変更し、処理関数HandoffDeal を呼び出してセクター シーリング プロセスを開始します。同時に、処理関数を呼び出す前に、 Action 関数を通じてマイナー トランザクション オブジェクトのConnectionClosed プロパティとDealID プロパティを変更します。 戻り値 environment.Node().WaitForMessage(ctx.Context(), *deal.PublishCid, func(code exitcode.ExitCode, retBytes []byte, err error) error {
err != nil の場合 {
ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals エラーが発生しました: %w", err)) を返します
}
コード!=終了コード.Okの場合{
ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals 終了コード: %s", code.String())) を返します
}
var retval market.PublishStorageDealsReturn
エラー = retval.UnmarshalCBOR(bytes.NewReader(retBytes))
err != nil の場合 {
ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals エラー アンマーシャリング結果: %w", err)) を返します
} ctx.Trigger(storagemarket.ProviderEventDealPublished, retval.IDs[0]) を返します
}) 8. `HandoffDeal`関数この関数はマイナーのProvide アダプタを呼び出す フラグメント パスを使用してファイル オブジェクトを生成します。 ファイル、err := environment.FileStore().Open(deal.PiecePath)、err != nil の場合 {
ctx.Trigger(storagemarket.ProviderEventFileStoreErrored, xerrors.Errorf("パス %s でピースを読み取り中: %w", deal.PiecePath, err)) を返します
} フラグメント ファイル ストリームを使用してフラグメント ストリームを生成します。 paddedReader、paddedSize := padreader.New(file、uint64(file.Size())) Lotus Provider アダプタ オブジェクトのOnDealComplete メソッドを呼び出して、トランザクションが完了したことを通知し、フラグメントをセクターに追加します。 エラー = environment.Node().OnDealComplete(
ctx.コンテキスト()、
ストレージマーケット.MinerDeal{
クライアント: deal.Client、
クライアント取引提案: deal.ClientDealProposal、
提案Cid: deal.ProposalCid、
状態: deal.State、
参照: deal.Ref、
取引ID: deal.DealID、
高速検索: deal.FastRetrieval、
ピースパス: filestore.Path(environment.FileStore().Filename(deal.PiecePath)),
},
パディングサイズ、
パデッドリーダー、
)エラー!= nilの場合{
ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err) を返します
} イベントを送信するには、fsm コンテキスト オブジェクトのTrigger メソッドを呼び出します。 ctx.Trigger(storagemarket.ProviderEventDealHandedOff) を返します ステート マシンがこのイベントを受信すると、イベント ハンドラーは状態をStorageDealStaged からStorageDealSealing に変更し、処理関数VerifyDealActivated を呼び出してセクター シーリングの結果を待機します。
9. `VerifyDealActivated`関数コールバック関数を生成します。 cb := func(err エラー) {
err != nil の場合 {
_ = ctx.Trigger(storagemarket.ProviderEventDealActivationFailed、エラー)
} それ以外 {
_ = ctx.Trigger(ストレージマーケット.ProviderEventDealActivated)
}
} Lotus Provider アダプタ オブジェクトは、トランザクション オブジェクトの変更を検出すると、このコールバック関数を呼び出して、対応するイベントを送信します。 ステート マシンがこのイベントを受信すると、イベント ハンドラーは状態をStorageDealSealing からStorageDealActive に変更し、処理関数RecordPieceInfo を呼び出して関連情報を記録します。 セクターがコミットされるまで待機するには、Lotus プロバイダー アダプター オブジェクトのOnDealSectorCommitted メソッドを呼び出します。 err := environment.Node().OnDealSectorCommitted(ctx.Context(), deal.Proposal.Provider, deal.DealID, cb) の場合、 err != nil {
ctx.Trigger(storagemarket.ProviderEventDealActivationFailed, err) を返します
} 空を返します。 nilを返す
9. `RecordPieceInfo`関数この機能は主に関連情報を記録します。 最後に、fsm コンテキスト オブジェクトのTrigger メソッドが呼び出され、イベント処理を通じてイベント オブジェクトが生成され、イベント オブジェクトがステート マシンに送信されます。ここで生成されるイベント オブジェクト名はProviderEventDealCompleted です。 ステート マシンがこのイベントを受信すると、イベント ハンドラーは状態をStorageDealActive からStorageDealCompleted に変更し、最終的にステート マシンの処理を終了します。 断片化された一時ファイルはここで削除されます。 この記事へのリンク: https://www.8btc.com/article/632253 転載の際は出典を明記してください |