精通Filecoin:Lotus真实数据处理之Provider处理存储
当Client
接收到用户的存储交易,创建一个 /fil/storage/mk/1.0.1
协议的流,再通过流发送存储交易,去处理这个协议的正是 HandleDealStream
方法。这个方法直接调用自身的 receiveDeal
方法进行处理。receiveDeal
方法处理如下:
从流中读取存储提案 Proposal
对象。
proposal, err := s.ReadDealProposal()
这里的流对象是 dealStream
对象(storagemarket/network/deal_stream.go),这个对象对原始流对象进行了封装。
获取 ipld node 对象。
proposalNd, err := cborutil.AsIpld(proposal.DealProposal)
生成矿工交易对象。
deal := &storagemarket.MinerDeal{ Client: s.RemotePeer(), Miner: p.net.ID(), ClientDealProposal: *proposal.DealProposal, ProposalCid: proposalNd.Cid(), State: storagemarket.StorageDealUnknown, Ref: proposal.Piece, }
调用 fsm 状态组的 Begin
的方法,生成一个状态机,并开始跟踪矿工交易对象。
err = p.deals.Begin(proposalNd.Cid(), deal)
保存流对象到连接管理器中。
err = p.conns.AddStream(proposalNd.Cid(), s)
发送事件到 fsm 状态组,从而开始对交易对象进行处理。
return p.deals.Send(proposalNd.Cid(), storagemarket.ProviderEventOpen)
当处理机收到 ProviderEventOpen
状态事件时,因为初始状态为默认值 0,即 StorageDealUnknown
,事件处理器对象经过内部处理找到对应的目的状态为 StorageDealValidating
,从而调用其处理函数 ValidateDealProposal
函数进行处理。
1、`ValidateDealProposal` 函数
这个函数用来验证交易提案对象。
调用 Lotus Provider 适配器对象的 GetChainHead
方法,获取区块链顶部 tipset key 和其高度。
tok, height, err := environment.Node().GetChainHead(ctx.Context())
if err != nil { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("node error getting most recent state id: %w", err)) }
验证客户发送的交易提案对象。如果验证不通过,则发送拒绝事件。
if err := providerutils.VerifyProposal(ctx.Context(), deal.ClientDealProposal, tok, environment.Node().VerifySignature); err != nil { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("verifying StorageDealProposal: %w", err)) }
检查交易提案中指定的矿工地址是否正确。如果不正确,则发送拒绝事件。
proposal := deal.Proposal
if proposal.Provider != environment.Address() { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("incorrect provider for deal")) }
检查交易指定的高度是否正确。如果不正确,则发送拒绝事件。
if height > proposal.StartEpoch-environment.DealAcceptanceBuffer() { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("deal start epoch is too soon or deal already expired")) }
检查费用是否OK,如果不OK,则发送拒绝事件。
minPrice := big.Div(big.Mul(environment.Ask().Price, abi.NewTokenAmount(int64(proposal.PieceSize))), abi.NewTokenAmount(1<<30)) if proposal.StoragePricePerEpoch.LessThan(minPrice) { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("storage price per epoch less than asking price: %s < %s", proposal.StoragePricePerEpoch, minPrice)) }
检查交易的大小是否匹配。如果不匹配,则发送拒绝事件。
if proposal.PieceSize < environment.Ask().MinPieceSize { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("piece size less than minimum required size: %d < %d", proposal.PieceSize, environment.Ask().MinPieceSize)) }
if proposal.PieceSize > environment.Ask().MaxPieceSize { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("piece size more than maximum allowed size: %d > %d", proposal.PieceSize, environment.Ask().MaxPieceSize)) }
获取客户的资金。
clientMarketBalance, err := environment.Node().GetBalance(ctx.Context(), proposal.Client, tok) if err != nil { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("node error getting client market balance failed: %w", err)) }
如果客户可用资金小于总的交易费用,则发送拒绝事件。
if clientMarketBalance.Available.LessThan(proposal.TotalStorageFee()) { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.New("clientMarketBalance.Available too small")) }
如果交易是验证过的,则进行验证。
fsm 上下文对象的 Trigger
方法,发送事件。
return ctx.Trigger(storagemarket.ProviderEventDealDeciding)
当状态机收到这个事件后,经过事件处理器把状态从 StorageDealUnknown
修改为 StorageDealAcceptWait
,从而调用其处理函数 DecideOnProposal
确定是否接收交易。
2、`DecideOnProposal` 函数
这个函数用来决定接受或拒绝交易。
调用环境对象的 RunCustomDecisionLogic
方法,运行自定义逻辑来验证是不接收客户交易。
accept, reason, err := environment.RunCustomDecisionLogic(ctx.Context(), deal)
if err != nil { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("custom deal decision logic failed: %w", err)) }
如果不接收,则发送拒绝事件。
if !accept { return ctx.Trigger(storagemarket.ProviderEventDealRejected, fmt.Errorf(reason)) }
调用环境对象的 SendSignedResponse
方法,发送签名的响应给客户端。
err = environment.SendSignedResponse(ctx.Context(), &network.Response{ State: storagemarket.StorageDealWaitingForData, Proposal: deal.ProposalCid, })
if err != nil { return ctx.Trigger(storagemarket.ProviderEventSendResponseFailed, err) }
这个方法找到对应的流,然后对响应进行签名,生成签名的响应对象,最后通过流发送响应。
断开与客户端的连接。
if err := environment.Disconnect(deal.ProposalCid); err != nil { log.Warnf("closing client connection: %+v", err) }
调用 fsm 上下文对象的 Trigger
方法,发送一个事件。
return ctx.Trigger(storagemarket.ProviderEventDataRequested)
当状态机收到这个事件后,经过事件处理器把状态从 StorageDealAcceptWait
修改为 StorageDealWaitingForData
,因为没有指定的处理函数,从而不会调用函数进行处理,一直等待数据传输过程发送事件。
当数据开始传输时,数据传输组件发送 ProviderEventDataTransferInitiated
事件,经过事件处理器把状态从 StorageDealWaitingForData
修改为 StorageDealTransferring
,因为没有指定的处理函数,从而不会调用函数进行处理,一直等待数据传输过程发送事件。
当数据传输完成时,数据传输组件发送 ProviderEventDataTransferCompleted
事件,经过事件处理器把状态从 StorageDealTransferring
修改为 StorageDealVerifyData
,从而调用其处理函数 VerifyData
验证数据。
3、`VerifyData` 函数
这个函数验证接受到的数据与交易提案中的 pieceCID 相匹配。
VerifyData
函数流程如下:
调用环境对象的 GeneratePieceCommitmentToFile
方法,生成碎片的 CID 、碎片所在目录和元数据目录。
pieceCid, piecePath, metadataPath, err := environment.GeneratePieceCommitmentToFile(deal.Ref.Root, shared.AllSelector())
GeneratePieceCommitmentToFile
方法内容如下:
调用文件存储对象的 CreateTemp
方法,创建一个临时文件。
f, err := pio.store.CreateTemp()
生成一个清理函数。
cleanup := func() { f.Close() _ = pio.store.Delete(f.Path()) }
从底层存储对象中获取指定 CID 的内容,然后写入指定文件。
err = pio.carIO.WriteCar(context.Background(), pio.bs, payloadCid, selector, f, userOnNewCarBlocks...)
获取文件大小,即碎片大小。
pieceSize := uint64(f.Size())
定位到文件开头位置。
_, err = f.Seek(0, io.SeekStart)
使用文件内容生成碎片 ID。
commitment, paddedSize, err := GeneratePieceCommitment(rt, f, pieceSize)
关闭文件。
_ = f.Close()
返回碎片 CID 和文件路径。
return commitment, f.Path(), paddedSize, nil
如果矿工设置了 universalRetrievalEnabled
标志,则直接调用 GeneratePieceCommitmentWithMetadata
函数进行处理。
if p.p.universalRetrievalEnabled { return providerutils.GeneratePieceCommitmentWithMetadata(p.p.fs, p.p.pio.GeneratePieceCommitmentToFile, p.p.proofType, payloadCid, selector) }
universalRetrievalEnabled
标志如果为真,则存储矿工会跟踪碎片中的所有 CID,因此对于所有 CID 都可以被检索,而不仅是 Root CID。
否则,调用 piece IO 对象的 GeneratePieceCommitmentToFile
方法进行处理。
pieceCid, piecePath, _, err := p.p.pio.GeneratePieceCommitmentToFile(p.p.proofType, payloadCid, selector)
payloadCid
表示根 Root CID。
piece IO 对象的 GeneratePieceCommitmentToFile
方法处理如下:
返回碎片 CID 和碎片路径。
return pieceCid, piecePath, filestore.Path(""), err
验证生成的碎片 CID 和矿工交易中交易提案的碎片 CID是否一致。如果不一致,则发送拒绝事件。
if pieceCid != deal.Proposal.PieceCID { return ctx.Trigger(storagemarket.ProviderEventDealRejected, xerrors.Errorf("proposal CommP doesn't match calculated CommP")) }3. 调用 fsm 上下文对象的
Trigger
方法,发送一个事件。return ctx.Trigger(storagemarket.ProviderEventVerifiedData, piecePath, metadataPath)
当状态机收到这个事件后,经过事件处理器把状态从 `StorageDealVerifyData` 修改为 `StorageDealEnsureProviderFunds`,从而调用其处理函数 `EnsureProviderFunds` 确定是否接收交易。同时,在调用处理函数之前,通过 `Action` 函数,修改矿工交易对象的 `PiecePath` 和 `MetadataPath` 两个属性。
4、`EnsureProviderFunds` 函数
这个函数用来确定矿工有足够的资金来处理当前交易。
获取 Lotus Provider 适配器。
node := environment.Node()
获取区块链顶部 tipset 对应的 key 和高度。
tok, _, err := node.GetChainHead(ctx.Context())
if err != nil { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("acquiring chain head: %w", err)) }
获取矿工的 worker 地址。
waddr, err := node.GetMinerWorkerAddress(ctx.Context(), deal.Proposal.Provider, tok)
if err != nil { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("looking up miner worker: %w", err)) }
调用 Lotus Provider 适配器的 EnsureFunds
方法,确保矿工有足够的资金来处理当前交易。
mcid, err := node.EnsureFunds(ctx.Context(), deal.Proposal.Provider, waddr, deal.Proposal.ProviderCollateral, tok)
if err != nil { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("ensuring funds: %w", err)) }
如果返回的 mcid
是空的,那么意味着已经实时确认,则调用 fsm 上下文对象的 Trigger
方法,发送一个事件。
if mcid == cid.Undef { return ctx.Trigger(storagemarket.ProviderEventFunded) }
否则,调用 fsm 上下文对象的 Trigger
方法,发送另一个事件。
return ctx.Trigger(storagemarket.ProviderEventFundingInitiated, mcid)
当状态机收到这个事件后,经过事件处理器把状态从 StorageDealEnsureProviderFunds
修改为 StorageDealProviderFunding
,从而调用其处理函数 WaitForFunding
等待产一步的消息上链。同时,在调用处理函数之前,通过 Action
函数,修改矿工交易对象的 PublishCid
属性。
5、`WaitForFunding` 函数
这个函数用来等待消息上链。消息上链之后,调用 fsm 上下文对象的 Trigger
方法,发送一个事件。
函数内容如下:
node := environment.Node()
return node.WaitForMessage(ctx.Context(), *deal.AddFundsCid, func(code exitcode.ExitCode, bytes []byte, err error) error { if err != nil { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds errored: %w", err)) } if code != exitcode.Ok { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("AddFunds exit code: %s", code.String())) } return ctx.Trigger(storagemarket.ProviderEventFunded) })
当状态机收到 ProviderEventFunded
这个事件后,经过事件处理器把状态从 StorageDealProviderFunding
修改为 StorageDealPublish
,从而调用其处理函数 PublishDeal
把交易信息上链。同时,在调用处理函数之前,通过 Action
函数,修改矿工交易对象的 PublishCid
属性。
6、`PublishDeal` 函数
这个函数主要用来提交交易信息上链。
生成矿工交易对象。
smDeal := storagemarket.MinerDeal{ Client: deal.Client, ClientDealProposal: deal.ClientDealProposal, ProposalCid: deal.ProposalCid, State: deal.State, Ref: deal.Ref, }
调用 Lotus Provider 适配器对象的 PublishDeals
把交易信息上链。
mcid, err := environment.Node().PublishDeals(ctx.Context(), smDeal) if err != nil { return ctx.Trigger(storagemarket.ProviderEventNodeErrored, xerrors.Errorf("publishing deal: %w", err)) }
调用 fsm 上下文对象的 Trigger
方法,发送事件。
return ctx.Trigger(storagemarket.ProviderEventDealPublishInitiated, mcid)
当状态机收到这个事件后,经过事件处理器把状态从 StorageDealPublish
修改为 StorageDealPublishing
,从而调用其处理函数 WaitForPublish
等待交易信息上链。
7、`WaitForPublish` 函数
这个函数用来等待交易信息上链,然后给客户端发送响应,然后断开与客户端的连接。最后调用 fsm 上下文对象的 Trigger
方法,通过事件处理生成一个事件对象,然后发送事件对象到状态机。此处生成的事件对象名称为 ProviderEventDealPublished
。
当状态机收到这个事件后,经过事件处理器把状态从 StorageDealPublishing
修改为 StorageDealStaged
,从而调用其处理函数 HandoffDeal
开始扇区密封处理。同时,在调用处理函数之前,通过 Action
函数,修改矿工交易对象的 ConnectionClosed
和 DealID
属性。
return environment.Node().WaitForMessage(ctx.Context(), *deal.PublishCid, func(code exitcode.ExitCode, retBytes []byte, err error) error { if err != nil { return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals errored: %w", err)) } if code != exitcode.Ok { return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals exit code: %s", code.String())) } var retval market.PublishStorageDealsReturn err = retval.UnmarshalCBOR(bytes.NewReader(retBytes)) if err != nil { return ctx.Trigger(storagemarket.ProviderEventDealPublishError, xerrors.Errorf("PublishStorageDeals error unmarshalling result: %w", err)) }
return ctx.Trigger(storagemarket.ProviderEventDealPublished, retval.IDs[0]) })
8、`HandoffDeal` 函数
这个函数调用 miner 的 Provide
适配器的
使用碎片路径生成文件对象。
file, err := environment.FileStore().Open(deal.PiecePath)
if err != nil { return ctx.Trigger(storagemarket.ProviderEventFileStoreErrored, xerrors.Errorf("reading piece at path %s: %w", deal.PiecePath, err)) }
使用碎片文件流生成碎片流。
paddedReader, paddedSize := padreader.New(file, uint64(file.Size()))
调用 Lotus Provider 适配器对象的 OnDealComplete
方法,通知交易已经完成,从而把碎片加入某个扇区中。
err = environment.Node().OnDealComplete( ctx.Context(), storagemarket.MinerDeal{ Client: deal.Client, ClientDealProposal: deal.ClientDealProposal, ProposalCid: deal.ProposalCid, State: deal.State, Ref: deal.Ref, DealID: deal.DealID, FastRetrieval: deal.FastRetrieval, PiecePath: filestore.Path(environment.FileStore().Filename(deal.PiecePath)), }, paddedSize, paddedReader, )
if err != nil { return ctx.Trigger(storagemarket.ProviderEventDealHandoffFailed, err) }
调用 fsm 上下文对象的 Trigger
方法,发送事件。
return ctx.Trigger(storagemarket.ProviderEventDealHandedOff)
当状态机收到这个事件后,经过事件处理器把状态从 StorageDealStaged
修改为 StorageDealSealing
,从而调用其处理函数 VerifyDealActivated
等待扇区密封结果。
9、`VerifyDealActivated` 函数
生成回调函数。
cb := func(err error) { if err != nil { _ = ctx.Trigger(storagemarket.ProviderEventDealActivationFailed, err) } else { _ = ctx.Trigger(storagemarket.ProviderEventDealActivated) } }
当 Lotus Provider 适配器对象检查到交易对象变化时会调用这个回调函数,从而发送相应的事件。
当状态机收到这个事件后,经过事件处理器把状态从 StorageDealSealing
修改为 StorageDealActive
,从而调用其处理函数 RecordPieceInfo
记录相关信息。
调用 Lotus Provider 适配器对象的 OnDealSectorCommitted
方法,等待扇区被提交。
err := environment.Node().OnDealSectorCommitted(ctx.Context(), deal.Proposal.Provider, deal.DealID, cb)
if err != nil { return ctx.Trigger(storagemarket.ProviderEventDealActivationFailed, err) }
返回空。
return nil
9、`RecordPieceInfo` 函数
这个函数主要记录相关信息。
最后调用 fsm 上下文对象的 Trigger
方法,通过事件处理生成一个事件对象,然后发送事件对象到状态机。此处生成的事件对象名称为 ProviderEventDealCompleted
。
当状态机收到这个事件后,经过事件处理器把状态从 StorageDealActive
修改为 StorageDealCompleted
,最终结束状态机处理。
这里会删除碎片的临时文件。
Scan QR code with WeChat