Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Blockslog reader, more flexible, backwards and forward. #144

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 146 additions & 36 deletions blockslog/blockslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,43 @@ package blockslog
import (
"encoding/binary"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"os"

"github.com/eoscanada/eos-go"
)

func Process(filename string) error {
fl, err := os.Open(filename)
type Reader struct {
filename string
fl *os.File

Version uint8
FirstBlockNum uint32
ChainID string

firstOffset int64
nextOffset int64
prevOffset int64
}

func NewReader(filename string) *Reader {
return &Reader{filename: filename}
}

func (r *Reader) Close() error {
if r.fl != nil {
return r.fl.Close()
}
return nil
}

func (r *Reader) ReadHeader() error {
fl, err := os.Open(r.filename)
if err != nil {
return err
}
r.fl = fl

versionData := make([]byte, 4)
_, err = fl.Read(versionData)
Expand All @@ -24,21 +48,24 @@ func Process(filename string) error {
}

version := versionData[0]
r.Version = version

fmt.Println("Version", version)
// fmt.Println("Version", version)

firstBlockData := []byte{1, 0, 0, 0}

if version > 1 {
fmt.Println("Reading first block")
// fmt.Println("Reading first block")
_, err = fl.Read(firstBlockData)
if err != nil {
return err
}
}

firstBlockNum := binary.LittleEndian.Uint32(firstBlockData)
fmt.Println("First block", firstBlockNum)

r.FirstBlockNum = firstBlockNum
// fmt.Println("First block", firstBlockNum)

// Certain conditions where the genesis state is written:
// bool block_log::contains_genesis_state(uint32_t version, uint32_t first_block_num) {
Expand All @@ -47,59 +74,142 @@ func Process(filename string) error {

chainID := make([]byte, 32)
if version >= 3 && firstBlockNum > 1 {
fmt.Println("Reading Chain ID")
_, err = fl.Read(chainID)
if err != nil {
return err
}
}

fmt.Println("Chain ID", hex.EncodeToString(chainID))
r.ChainID = hex.EncodeToString(chainID)

if version != 1 {
totem := make([]byte, 8)
_, err := fl.Read(totem)
if err != nil {
return err
}
fmt.Println("Totem", totem)
}

for i := 0 ; i < 5; i++{
cnt := make([]byte, 1000000)
startPos, err := fl.Seek(0, os.SEEK_CUR)
if err != nil {
return err
}

prevPos, err := fl.Seek(0, os.SEEK_CUR)
if err != nil {
return err
}
r.firstOffset = startPos

_, err = fl.Read(cnt)
if err == io.EOF {
break
}
if err != nil {
return err
}
r.First()

d := eos.NewDecoder(cnt)
var block *eos.SignedBlock
return nil
}

if err := d.Decode(&block); err != nil {
return fmt.Errorf("decoding signed block: %w", err)
}
func (r *Reader) Next() (*eos.SignedBlock, []byte, error) {
if r.nextOffset == -1 {
return nil, nil, io.EOF
}

jsonStr, err := json.Marshal(block)
if err != nil {
return err
}
_, err := r.fl.Seek(r.nextOffset, os.SEEK_SET)
if err != nil {
return nil, nil, err
}

fmt.Println(string(jsonStr))
blk, bytesRead, err := r.readSignedBlock()
if err != nil {
return nil, nil, err
}

if _, err = fl.Seek(prevPos+int64(d.LastPos())+8, os.SEEK_SET); err != nil {
return err
}
fmt.Println("Last pos", d.LastPos())
r.prevOffset = r.nextOffset
r.nextOffset = r.nextOffset + int64(len(bytesRead)) + 8

return blk, bytesRead, nil
}

func (r *Reader) First() {
r.nextOffset = r.firstOffset
r.prevOffset = -1
}

func (r *Reader) Last() error {
_, err := r.fl.Seek(-8, os.SEEK_END)
if err != nil {
return err
}

cnt := make([]byte, 8)

_, err = r.fl.Read(cnt)
if err != nil {
return err
}

lastBlockOffset := binary.LittleEndian.Uint64(cnt)

// TODO: perhaps check if the blocks log is empty, and the whole offset
// thing is less than a single block's size..

r.prevOffset = int64(lastBlockOffset)
r.nextOffset = -1

return nil
}

func (r *Reader) Prev() (*eos.SignedBlock, []byte, error) {
if r.prevOffset == -1 {
return nil, nil, io.EOF
}

_, err := r.fl.Seek(r.prevOffset-8, os.SEEK_SET)
if err != nil {
return nil, nil, fmt.Errorf("seek -8: %w", err)
}

prevOffsetBin := make([]byte, 8)
_, err = r.fl.Read(prevOffsetBin)
if err != nil {
return nil, nil, fmt.Errorf("read offset: %w", err)
}
prevOffset := binary.LittleEndian.Uint64(prevOffsetBin)

blk, bytesRead, err := r.readSignedBlock()
if err != nil {
return nil, nil, fmt.Errorf("read signed block: %w", err)
}

r.nextOffset = r.prevOffset
r.prevOffset = int64(prevOffset)

return blk, bytesRead, nil
}

func (r *Reader) readSignedBlock() (block *eos.SignedBlock, bytesRead []byte, err error) {
cnt := make([]byte, 1000000)

// prevPos, err := r.fl.Seek(offset, os.SEEK_SET)
// if err != nil {
// return
// }

_, err = r.fl.Read(cnt)
if err != nil {
return
}

d := eos.NewDecoder(cnt)

if err = d.Decode(&block); err != nil {
err = fmt.Errorf("decoding signed block: %w", err)
return
}

// jsonStr, err := json.Marshal(block)
// if err != nil {
// return err
// }

// fmt.Println(string(jsonStr))

bytesRead = cnt[:d.LastPos()]

// r.nextOffset = prevPos + int64(d.LastPos()) + 8
// r.prevOffset = prevPos
return
}
4 changes: 1 addition & 3 deletions blockslog/blockslog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package blockslog

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestMe(t *testing.T) {
require.NoError(t, Process("/home/abourget/dfuse/dfuse-eosio/proj/mainnet/mindreader/data/blocks/blocks.log"))
//require.NoError(t, Process("/home/abourget/dfuse/dfuse-eosio/proj/mainnet/mindreader/data/blocks/blocks.log"))
}
Loading