Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ require (
github.com/hashicorp/golang-lru v1.0.2
github.com/hyperledger/firefly-common v1.5.6-0.20250630201730-e234335c0381
github.com/hyperledger/firefly-signer v1.1.21
github.com/hyperledger/firefly-transaction-manager v1.4.0
github.com/hyperledger/firefly-transaction-manager v1.4.1-0.20250910153533-14142cf9f697
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.0
github.com/stretchr/testify v1.9.0
Expand Down
8 changes: 8 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ github.com/hyperledger/firefly-signer v1.1.21 h1:r7cTOw6e/6AtiXLf84wZy6Z7zppzlc1
github.com/hyperledger/firefly-signer v1.1.21/go.mod h1:axrlSQeKrd124UdHF5L3MkTjb5DeTcbJxJNCZ3JmcWM=
github.com/hyperledger/firefly-transaction-manager v1.4.0 h1:l9DCizLTohKtKec5dewNlydhAeko1/DmTfCRF8le9m0=
github.com/hyperledger/firefly-transaction-manager v1.4.0/go.mod h1:mEd9dOH8ds6ajgfPh6nnP3Pd3f8XIZtQRnucqAIJHRs=
github.com/hyperledger/firefly-transaction-manager v1.4.1-0.20250910122026-4f65f76eb9eb h1:doSlc4SN1LIA+kMMW/vBDHaud+5Ad5+eWRitbfGBxho=
github.com/hyperledger/firefly-transaction-manager v1.4.1-0.20250910122026-4f65f76eb9eb/go.mod h1:KHGvK/QqD2jpRZ04lQoX/k1T2o644NCkRlr3FbvKqnA=
github.com/hyperledger/firefly-transaction-manager v1.4.1-0.20250910151057-7bc7bb81591c h1:RNd7cMvH8Mr/wE2Y2B4Vy0+5l0FN4G6UMC4rMqJeFjE=
github.com/hyperledger/firefly-transaction-manager v1.4.1-0.20250910151057-7bc7bb81591c/go.mod h1:KHGvK/QqD2jpRZ04lQoX/k1T2o644NCkRlr3FbvKqnA=
github.com/hyperledger/firefly-transaction-manager v1.4.1-0.20250910153251-07127ff35b09 h1:1Frz0u69ETPkFbFGcLxPyWJrzUnBv+yVKNdZUfT7wBE=
github.com/hyperledger/firefly-transaction-manager v1.4.1-0.20250910153251-07127ff35b09/go.mod h1:KHGvK/QqD2jpRZ04lQoX/k1T2o644NCkRlr3FbvKqnA=
github.com/hyperledger/firefly-transaction-manager v1.4.1-0.20250910153533-14142cf9f697 h1:leUNAoiwMidZYwH+F6bmCa7kvN3qcPGH25k2HcMptyg=
github.com/hyperledger/firefly-transaction-manager v1.4.1-0.20250910153533-14142cf9f697/go.mod h1:KHGvK/QqD2jpRZ04lQoX/k1T2o644NCkRlr3FbvKqnA=
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
Expand Down
134 changes: 69 additions & 65 deletions internal/ethereum/blocklistener.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,17 @@ type blockListener struct {
initialBlockHeightObtained chan struct{}
newHeadsTap chan struct{}
newHeadsSub rpcbackend.Subscription
highestBlock int64
mux sync.Mutex
highestBlockSet bool
highestBlock uint64
mux sync.RWMutex
consumers map[fftypes.UUID]*blockUpdateConsumer
blockPollingInterval time.Duration
unstableHeadLength int
canonicalChain *list.List
hederaCompatibilityMode bool
blockCache *lru.Cache
}

type minimalBlockInfo struct {
number int64
hash string
parentHash string
// canonical chain
unstableHeadLength int
canonicalChain *list.List
}

func newBlockListener(ctx context.Context, c *ethConnector, conf config.Section, wsConf *wsclient.WSConfig) (bl *blockListener, err error) {
Expand All @@ -81,7 +78,8 @@ func newBlockListener(ctx context.Context, c *ethConnector, conf config.Section,
startDone: make(chan struct{}),
initialBlockHeightObtained: make(chan struct{}),
newHeadsTap: make(chan struct{}),
highestBlock: -1,
highestBlockSet: false,
highestBlock: 0,
consumers: make(map[fftypes.UUID]*blockUpdateConsumer),
blockPollingInterval: conf.GetDuration(BlockPollingInterval),
canonicalChain: list.New(),
Expand Down Expand Up @@ -165,10 +163,7 @@ func (bl *blockListener) establishBlockHeightWithRetry() error {
return true, rpcErr.Error()
}

bl.mux.Lock()
bl.highestBlock = hexBlockHeight.BigInt().Int64()
bl.mux.Unlock()

bl.setHighestBlock(hexBlockHeight.BigInt().Uint64())
return false, nil
})
}
Expand Down Expand Up @@ -260,15 +255,15 @@ func (bl *blockListener) listenLoop() {
default:
candidate := bl.reconcileCanonicalChain(bi)
// Check this is the lowest position to notify from
if candidate != nil && (notifyPos == nil || candidate.Value.(*minimalBlockInfo).number <= notifyPos.Value.(*minimalBlockInfo).number) {
if candidate != nil && (notifyPos == nil || candidate.Value.(*ffcapi.MinimalBlockInfo).BlockNumber <= notifyPos.Value.(*ffcapi.MinimalBlockInfo).BlockNumber) {
notifyPos = candidate
}
}
}
if notifyPos != nil {
// We notify for all hashes from the point of change in the chain onwards
for notifyPos != nil {
update.BlockHashes = append(update.BlockHashes, notifyPos.Value.(*minimalBlockInfo).hash)
update.BlockHashes = append(update.BlockHashes, notifyPos.Value.(*ffcapi.MinimalBlockInfo).BlockHash)
notifyPos = notifyPos.Next()
}

Expand All @@ -295,16 +290,12 @@ func (bl *blockListener) listenLoop() {
// head of the canonical chain we have. If these blocks do not just fit onto the end of the chain, then we
// work backwards building a new view and notify about all blocks that are changed in that process.
func (bl *blockListener) reconcileCanonicalChain(bi *blockInfoJSONRPC) *list.Element {
mbi := &minimalBlockInfo{
number: bi.Number.BigInt().Int64(),
hash: bi.Hash.String(),
parentHash: bi.ParentHash.String(),
mbi := &ffcapi.MinimalBlockInfo{
BlockNumber: fftypes.FFuint64(bi.Number.BigInt().Uint64()),
BlockHash: bi.Hash.String(),
ParentHash: bi.ParentHash.String(),
}
bl.mux.Lock()
if mbi.number > bl.highestBlock {
bl.highestBlock = mbi.number
}
bl.mux.Unlock()
bl.checkAndSetHighestBlock(mbi.BlockNumber.Uint64())

// Find the position of this block in the block sequence
pos := bl.canonicalChain.Back()
Expand All @@ -313,15 +304,15 @@ func (bl *blockListener) reconcileCanonicalChain(bi *blockInfoJSONRPC) *list.Ele
// We've eliminated all the existing chain (if there was any)
return bl.handleNewBlock(mbi, nil)
}
posBlock := pos.Value.(*minimalBlockInfo)
posBlock := pos.Value.(*ffcapi.MinimalBlockInfo)
switch {
case posBlock.number == mbi.number && posBlock.hash == mbi.hash && posBlock.parentHash == mbi.parentHash:
case posBlock.Equal(mbi):
// This is a duplicate - no need to notify of anything
return nil
case posBlock.number == mbi.number:
case posBlock.BlockNumber.Uint64() == mbi.BlockNumber.Uint64():
// We are replacing a block in the chain
return bl.handleNewBlock(mbi, pos.Prev())
case posBlock.number < mbi.number:
case posBlock.BlockNumber.Uint64() < mbi.BlockNumber.Uint64():
// We have a position where this block goes
return bl.handleNewBlock(mbi, pos)
default:
Expand All @@ -333,14 +324,14 @@ func (bl *blockListener) reconcileCanonicalChain(bi *blockInfoJSONRPC) *list.Ele

// handleNewBlock rebuilds the canonical chain around a new block, checking if we need to rebuild our
// view of the canonical chain behind it, or trimming anything after it that is invalidated by a new fork.
func (bl *blockListener) handleNewBlock(mbi *minimalBlockInfo, addAfter *list.Element) *list.Element {
func (bl *blockListener) handleNewBlock(mbi *ffcapi.MinimalBlockInfo, addAfter *list.Element) *list.Element {
// If we have an existing canonical chain before this point, then we need to check we've not
// invalidated that with this block. If we have, then we have to re-verify our whole canonical
// chain from the first block. Then notify from the earliest point where it has diverged.
if addAfter != nil {
prevBlock := addAfter.Value.(*minimalBlockInfo)
if prevBlock.number != (mbi.number-1) || prevBlock.hash != mbi.parentHash {
log.L(bl.ctx).Infof("Notified of block %d / %s that does not fit after block %d / %s (expected parent: %s)", mbi.number, mbi.hash, prevBlock.number, prevBlock.hash, mbi.parentHash)
prevBlock := addAfter.Value.(*ffcapi.MinimalBlockInfo)
if prevBlock.BlockNumber.Uint64() != (mbi.BlockNumber.Uint64()-1) || prevBlock.BlockHash != mbi.ParentHash {
log.L(bl.ctx).Infof("Notified of block %d / %s that does not fit after block %d / %s (expected parent: %s)", mbi.BlockNumber.Uint64(), mbi.BlockHash, prevBlock.BlockNumber.Uint64(), prevBlock.BlockHash, mbi.ParentHash)
return bl.rebuildCanonicalChain()
}
}
Expand Down Expand Up @@ -368,7 +359,7 @@ func (bl *blockListener) handleNewBlock(mbi *minimalBlockInfo, addAfter *list.El
_ = bl.canonicalChain.Remove(bl.canonicalChain.Front())
}

log.L(bl.ctx).Debugf("Added block %d / %s parent=%s to in-memory canonical chain (new length=%d)", mbi.number, mbi.hash, mbi.parentHash, bl.canonicalChain.Len())
log.L(bl.ctx).Debugf("Added block %d / %s parent=%s to in-memory canonical chain (new length=%d)", mbi.BlockNumber.Uint64(), mbi.BlockHash, mbi.ParentHash, bl.canonicalChain.Len())

return newElem
}
Expand All @@ -379,18 +370,18 @@ func (bl *blockListener) handleNewBlock(mbi *minimalBlockInfo, addAfter *list.El
func (bl *blockListener) rebuildCanonicalChain() *list.Element {
// If none of our blocks were valid, start from the first block number we've notified about previously
lastValidBlock := bl.trimToLastValidBlock()
var nextBlockNumber int64
var nextBlockNumber uint64
var expectedParentHash string
if lastValidBlock != nil {
nextBlockNumber = lastValidBlock.number + 1
nextBlockNumber = lastValidBlock.BlockNumber.Uint64() + 1
log.L(bl.ctx).Infof("Canonical chain partially rebuilding from block %d", nextBlockNumber)
expectedParentHash = lastValidBlock.hash
expectedParentHash = lastValidBlock.BlockHash
} else {
firstBlock := bl.canonicalChain.Front()
if firstBlock == nil || firstBlock.Value == nil {
return nil
}
nextBlockNumber = firstBlock.Value.(*minimalBlockInfo).number
nextBlockNumber = firstBlock.Value.(*ffcapi.MinimalBlockInfo).BlockNumber.Uint64()
log.L(bl.ctx).Warnf("Canonical chain re-initialized at block %d", nextBlockNumber)
// Clear out the whole chain
bl.canonicalChain = bl.canonicalChain.Init()
Expand All @@ -412,19 +403,19 @@ func (bl *blockListener) rebuildCanonicalChain() *list.Element {
log.L(bl.ctx).Infof("Canonical chain rebuilt the chain to the head block %d", nextBlockNumber-1)
break
}
mbi := &minimalBlockInfo{
number: bi.Number.BigInt().Int64(),
hash: bi.Hash.String(),
parentHash: bi.ParentHash.String(),
mbi := &ffcapi.MinimalBlockInfo{
BlockNumber: fftypes.FFuint64(bi.Number.BigInt().Uint64()),
BlockHash: bi.Hash.String(),
ParentHash: bi.ParentHash.String(),
}

// It's possible the chain will change while we're doing this, and we fall back to the next block notification
// to sort that out.
if expectedParentHash != "" && mbi.parentHash != expectedParentHash {
log.L(bl.ctx).Infof("Canonical chain rebuilding stopped at block: %d due to mismatch hash for parent block (%d): %s (expected: %s)", nextBlockNumber, nextBlockNumber-1, mbi.parentHash, expectedParentHash)
if expectedParentHash != "" && mbi.ParentHash != expectedParentHash {
log.L(bl.ctx).Infof("Canonical chain rebuilding stopped at block: %d due to mismatch hash for parent block (%d): %s (expected: %s)", nextBlockNumber, nextBlockNumber-1, mbi.ParentHash, expectedParentHash)
break
}
expectedParentHash = mbi.hash
expectedParentHash = mbi.BlockHash
nextBlockNumber++

// Note we do not trim to a length here, as we need to notify for every block we haven't notified for.
Expand All @@ -434,33 +425,30 @@ func (bl *blockListener) rebuildCanonicalChain() *list.Element {
notifyPos = newElem
}

bl.mux.Lock()
if mbi.number > bl.highestBlock {
bl.highestBlock = mbi.number
}
bl.mux.Unlock()
bl.checkAndSetHighestBlock(mbi.BlockNumber.Uint64())

}
return notifyPos
}

func (bl *blockListener) trimToLastValidBlock() (lastValidBlock *minimalBlockInfo) {
func (bl *blockListener) trimToLastValidBlock() (lastValidBlock *ffcapi.MinimalBlockInfo) {
// First remove from the end until we get a block that matches the current un-cached query view from the chain
lastElem := bl.canonicalChain.Back()
var startingNumber *int64
var startingNumber *uint64
for lastElem != nil && lastElem.Value != nil {

// Query the block that is no at this blockNumber
currentViewBlock := lastElem.Value.(*minimalBlockInfo)
currentViewBlock := lastElem.Value.(*ffcapi.MinimalBlockInfo)
if startingNumber == nil {
startingNumber = &currentViewBlock.number
currentNumber := currentViewBlock.BlockNumber.Uint64()
startingNumber = &currentNumber
log.L(bl.ctx).Debugf("Canonical chain checking from last block: %d", startingNumber)
}
var freshBlockInfo *blockInfoJSONRPC
var reason ffcapi.ErrorReason
err := bl.c.retry.Do(bl.ctx, "rebuild listener canonical chain", func(_ int) (retry bool, err error) {
log.L(bl.ctx).Debugf("Canonical chain validating block: %d", currentViewBlock.number)
freshBlockInfo, reason, err = bl.getBlockInfoByNumber(bl.ctx, currentViewBlock.number, false, "")
log.L(bl.ctx).Debugf("Canonical chain validating block: %d", currentViewBlock.BlockNumber.Uint64())
freshBlockInfo, reason, err = bl.getBlockInfoByNumber(bl.ctx, currentViewBlock.BlockNumber.Uint64(), false, "")
return reason != ffcapi.ErrorReasonNotFound, err
})
if err != nil {
Expand All @@ -469,8 +457,8 @@ func (bl *blockListener) trimToLastValidBlock() (lastValidBlock *minimalBlockInf
}
}

if freshBlockInfo != nil && freshBlockInfo.Hash.String() == currentViewBlock.hash {
log.L(bl.ctx).Debugf("Canonical chain found last valid block %d", currentViewBlock.number)
if freshBlockInfo != nil && freshBlockInfo.Hash.String() == currentViewBlock.BlockHash {
log.L(bl.ctx).Debugf("Canonical chain found last valid block %d", currentViewBlock.BlockNumber.Uint64())
lastValidBlock = currentViewBlock
// Trim everything after this point, as it's invalidated
nextElem := lastElem.Next()
Expand All @@ -484,8 +472,8 @@ func (bl *blockListener) trimToLastValidBlock() (lastValidBlock *minimalBlockInf
lastElem = lastElem.Prev()
}

if startingNumber != nil && lastValidBlock != nil && *startingNumber != lastValidBlock.number {
log.L(bl.ctx).Debugf("Canonical chain trimmed from block %d to block %d (total number of in memory blocks: %d)", startingNumber, lastValidBlock.number, bl.unstableHeadLength)
if startingNumber != nil && lastValidBlock != nil && *startingNumber != lastValidBlock.BlockNumber.Uint64() {
log.L(bl.ctx).Debugf("Canonical chain trimmed from block %d to block %d (total number of in memory blocks: %d)", startingNumber, lastValidBlock.BlockNumber.Uint64(), bl.unstableHeadLength)
}
return lastValidBlock
}
Expand Down Expand Up @@ -522,29 +510,45 @@ func (bl *blockListener) addConsumer(ctx context.Context, c *blockUpdateConsumer
bl.consumers[*c.id] = c
}

func (bl *blockListener) getHighestBlock(ctx context.Context) (int64, bool) {
func (bl *blockListener) getHighestBlock(ctx context.Context) (uint64, bool) {
bl.checkAndStartListenerLoop()
// block height will be established as the first step of listener startup process
// so we don't need to wait for the entire startup process to finish to return the result
bl.mux.Lock()
highestBlock := bl.highestBlock
highestBlockSet := bl.highestBlockSet
bl.mux.Unlock()
// if not yet initialized, wait to be initialized
if highestBlock < 0 {
if !highestBlockSet {
select {
case <-bl.initialBlockHeightObtained:
case <-ctx.Done():
// Inform caller we timed out, or were closed
return -1, false
return 0, false
}
}
bl.mux.Lock()
highestBlock = bl.highestBlock
highestBlock := bl.highestBlock
bl.mux.Unlock()
log.L(ctx).Debugf("ChainHead=%d", highestBlock)
return highestBlock, true
}

func (bl *blockListener) setHighestBlock(block uint64) {
bl.mux.Lock()
defer bl.mux.Unlock()
bl.highestBlock = block
bl.highestBlockSet = true
}

func (bl *blockListener) checkAndSetHighestBlock(block uint64) {
bl.mux.Lock()
defer bl.mux.Unlock()
if block > bl.highestBlock {
bl.highestBlock = block
bl.highestBlockSet = true
}
}

func (bl *blockListener) waitClosed() {
bl.mux.Lock()
listenLoopDone := bl.listenLoopDone
Expand Down
48 changes: 40 additions & 8 deletions internal/ethereum/blocklistener_blockquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,46 @@ func (bl *blockListener) addToBlockCache(blockInfo *blockInfoJSONRPC) {
bl.blockCache.Add(blockInfo.Number.BigInt().String(), blockInfo)
}

func (bl *blockListener) getBlockInfoByNumber(ctx context.Context, blockNumber int64, allowCache bool, expectedHashStr string) (*blockInfoJSONRPC, ffcapi.ErrorReason, error) {
func (bl *blockListener) getBlockInfoContainsTxHash(ctx context.Context, txHash string) (*ffcapi.MinimalBlockInfo, error) {

// Query the chain to find the transaction block
// Note: should consider have an in-memory map of transaction hash to block for faster lookup
// The extra memory usage of the map should be outweighed by the speed improvement of lookup
// But I saw we have a ffcapi.MinimalBlockInfo struct that intentionally removes the tx hashes
// so need to figure out the reason first
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you going to do this figuring out as part of the PR? I did wonder why the TX hash -> block was not a thing

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, would like to, waiting for @peterbroadhurst 's input


// TODO: add a cache if map cannot be used
res, reason, receiptErr := bl.c.TransactionReceipt(ctx, &ffcapi.TransactionReceiptRequest{
TransactionHash: txHash,
})
if receiptErr != nil && reason != ffcapi.ErrorReasonNotFound {
return nil, i18n.WrapError(ctx, receiptErr, msgs.MsgFailedToQueryReceipt, txHash)
}
if res == nil {
return nil, nil
}
txBlockHash := res.BlockHash
txBlockNumber := res.BlockNumber.Uint64()
// get the parent hash of the transaction block
bi, reason, err := bl.getBlockInfoByNumber(ctx, txBlockNumber, true, txBlockHash)
if err != nil && reason != ffcapi.ErrorReasonNotFound { // if the block info is not found, then there could be a fork, twe don't throw error in this case and treating it as block not found
return nil, i18n.WrapError(ctx, err, msgs.MsgFailedToQueryBlockInfo, txHash)
}
if bi == nil {
return nil, nil
}

return &ffcapi.MinimalBlockInfo{
BlockNumber: fftypes.FFuint64(bi.Number.BigInt().Uint64()),
BlockHash: bi.Hash.String(),
ParentHash: bi.ParentHash.String(),
}, nil
}

func (bl *blockListener) getBlockInfoByNumber(ctx context.Context, blockNumber uint64, allowCache bool, expectedHashStr string) (*blockInfoJSONRPC, ffcapi.ErrorReason, error) {
var blockInfo *blockInfoJSONRPC
if allowCache {
cached, ok := bl.blockCache.Get(strconv.FormatInt(blockNumber, 10))
cached, ok := bl.blockCache.Get(strconv.FormatUint(blockNumber, 10))
if ok {
blockInfo = cached.(*blockInfoJSONRPC)
if expectedHashStr != "" && blockInfo.ParentHash.String() != expectedHashStr {
Expand All @@ -67,16 +103,12 @@ func (bl *blockListener) getBlockInfoByNumber(ctx context.Context, blockNumber i
}

if blockInfo == nil {
rpcErr := bl.backend.CallRPC(ctx, &blockInfo, "eth_getBlockByNumber", ethtypes.NewHexInteger64(blockNumber), false /* only the txn hashes */)
rpcErr := bl.backend.CallRPC(ctx, &blockInfo, "eth_getBlockByNumber", ethtypes.NewHexIntegerU64(blockNumber), false /* only the txn hashes */)
if rpcErr != nil {
if mapError(blockRPCMethods, rpcErr.Error()) == ffcapi.ErrorReasonNotFound {
log.L(ctx).Debugf("Received error signifying 'block not found': '%s'", rpcErr.Message)
return nil, ffcapi.ErrorReasonNotFound, i18n.NewError(ctx, msgs.MsgBlockNotAvailable)
}
return nil, ffcapi.ErrorReason(""), rpcErr.Error()
}
if blockInfo == nil {
return nil, ffcapi.ErrorReason(""), nil
return nil, ffcapi.ErrorReasonNotFound, i18n.NewError(ctx, msgs.MsgBlockNotAvailable)
}
bl.addToBlockCache(blockInfo)
}
Expand Down
Loading