|
import pickle |
|
import hashlib |
|
import time |
|
import plyvel |
|
import struct |
|
from typing import List |
|
from hashlib import sha256 |
|
import streamlit as st |
|
|
|
|
|
TARGET_BITS = 24 |
|
|
|
class Block: |
|
def __init__(self, index, previous_hash, timestamp, data, nonce, hash): |
|
self.index = index |
|
self.previous_hash = previous_hash |
|
self.timestamp = timestamp |
|
self.data = data |
|
self.nonce = nonce |
|
self.hash = hash |
|
|
|
def serialize(self): |
|
"""Serialize the Block object to a byte stream.""" |
|
return pickle.dumps(self.__dict__) |
|
|
|
@staticmethod |
|
def deserialize(serialized_block): |
|
"""Deserialize a byte stream into a Block object.""" |
|
block_dict = pickle.loads(serialized_block) |
|
return Block(**block_dict) |
|
|
|
def __repr__(self): |
|
return (f"Block(index={self.index}, " |
|
f"timestamp={self.timestamp}, " |
|
f"data={self.data.decode('utf-8')}, " |
|
f"prev_block_hash={self.previous_hash.hex()}, " |
|
f"hash={self.hash.hex()}, " |
|
f"nonce={self.nonce})") |
|
|
|
class ProofOfWork: |
|
def __init__(self, block: Block): |
|
self.block = block |
|
self.target = 1 << (256 - TARGET_BITS) |
|
|
|
def prepare_data(self, nonce: int) -> bytes: |
|
"""Prepare the data to be hashed.""" |
|
return b''.join([ |
|
self.block.previous_hash, |
|
self.block.data, |
|
struct.pack('>I', int(self.block.timestamp)), |
|
struct.pack('>I', TARGET_BITS), |
|
struct.pack('>I', nonce) |
|
]) |
|
|
|
def run(self) -> (int, bytes): |
|
"""Run the Proof of Work algorithm.""" |
|
nonce = 0 |
|
st.write(f"Mining the block containing \"{self.block.data.decode('utf-8')}\"") |
|
while nonce < 2**32: |
|
data = self.prepare_data(nonce) |
|
hash = sha256(data).digest() |
|
st.write(f"\r{hash.hex()}", end="") |
|
|
|
hash_int = int.from_bytes(hash, byteorder='big') |
|
if hash_int < self.target: |
|
break |
|
else: |
|
nonce += 1 |
|
st.write("\n\n") |
|
return nonce, hash |
|
|
|
def validate(self) -> bool: |
|
"""Validate the Proof of Work.""" |
|
data = self.prepare_data(self.block.nonce) |
|
hash = sha256(data).digest() |
|
hash_int = int.from_bytes(hash, byteorder='big') |
|
return hash_int < self.target |
|
|
|
def new_genesis_block(): |
|
return Block(index=0, previous_hash=b'0'*64, timestamp=time.time(), data='Genesis Block'.encode('utf-8'), nonce=0, hash=b'genesis_hash') |
|
|
|
class Blockchain: |
|
def __init__(self): |
|
self.db = plyvel.DB('blockchain.db', create_if_missing=True) |
|
self.tip = self._init_blockchain() |
|
|
|
def _init_blockchain(self): |
|
"""Initialize the blockchain or load the existing one.""" |
|
blocks_bucket = self.db.prefixed_db(b'blocks') |
|
if blocks_bucket.get(b'l') is None: |
|
genesis = new_genesis_block() |
|
blocks_bucket.put(genesis.hash, genesis.serialize()) |
|
blocks_bucket.put(b'l', genesis.hash) |
|
return genesis.hash |
|
else: |
|
return blocks_bucket.get(b'l') |
|
|
|
def add_block(self, data): |
|
"""Add a new block to the blockchain.""" |
|
blocks_bucket = self.db.prefixed_db(b'blocks') |
|
last_hash = blocks_bucket.get(b'l') |
|
|
|
new_block = Block(index=self.get_block_count(), previous_hash=last_hash, timestamp=time.time(), data=data.encode('utf-8'), nonce=0, hash=b'') |
|
pow = ProofOfWork(new_block) |
|
nonce, hash = pow.run() |
|
|
|
new_block.hash = hash |
|
new_block.nonce = nonce |
|
|
|
blocks_bucket.put(new_block.hash, new_block.serialize()) |
|
blocks_bucket.put(b'l', new_block.hash) |
|
self.tip = new_block.hash |
|
|
|
def get_block_count(self): |
|
"""Get the number of blocks in the blockchain.""" |
|
blocks_bucket = self.db.prefixed_db(b'blocks') |
|
return len(list(blocks_bucket)) |
|
|
|
def get_last_block_hash(self): |
|
"""Get the hash of the last block in the chain.""" |
|
blocks_bucket = self.db.prefixed_db(b'blocks') |
|
return blocks_bucket.get(b'l') |
|
|
|
def get_block(self, block_hash): |
|
"""Retrieve a block from the BoltDB by its hash.""" |
|
blocks_bucket = self.db.prefixed_db(b'blocks') |
|
serialized_block = blocks_bucket.get(block_hash) |
|
if serialized_block: |
|
return Block.deserialize(serialized_block) |
|
return None |
|
|
|
def close(self): |
|
"""Close the database connection.""" |
|
self.db.close() |
|
|
|
def print_blockchain(bc): |
|
last_hash = bc.get_last_block_hash() |
|
while last_hash: |
|
block = bc.get_block(last_hash) |
|
if block is None: |
|
st.write("Block not found for hash:", last_hash.hex()) |
|
break |
|
st.write(f"Prev. hash: {block.previous_hash.hex()}") |
|
st.write(f"Data: {block.data.decode('utf-8')}") |
|
st.write(f"Hash: {block.hash.hex()}") |
|
st.write(f"Nonce: {block.nonce}") |
|
pow = ProofOfWork(block) |
|
st.write(f"PoW: {pow.validate()}") |
|
st.write() |
|
|
|
if block.previous_hash == b'0'*64: |
|
break |
|
last_hash = block.previous_hash |
|
|
|
|
|
try: |
|
bc = Blockchain() |
|
except IOError as e: |
|
st.write(f"Error initializing blockchain: {e}") |
|
bc = None |
|
|
|
if bc: |
|
try: |
|
|
|
starbucks_data = "Pay 0.0001 BTC for a Starbucks reusable Holiday cup coffee ($9.20 USD)" |
|
bc.add_block(starbucks_data) |
|
|
|
|
|
print_blockchain(bc) |
|
|
|
finally: |
|
|
|
bc.close() |
|
|
|
|
|
st.title("Blockchain Prototype 4") |
|
st.image('Starbucks BTC Model 1.jpeg', caption='Starbucks BTC Model 1') |
|
st.image('Starbucks BTC Model 2.jpeg', caption='Starbucks BTC Model 2') |