Module cardano.wt.blockfrost

Expand source code
import json
import requests
import time

from http import HTTPStatus

from cardano.wt.utxo import Utxo

"""
Repreentation of the Blockfrost web API used in retrieving metadata about txn i/o on the chain.
"""
class BlockfrostApi(object):

    PREPROD_MAGIC = '1'
    PREVIEW_MAGIC = '2'

    _API_CALLS_PER_SEC = 10
    _APPLICATION_JSON = 'application/json'
    _BACKOFF_SEC = 10
    _BURST_TXN_PER_SEC = 10
    _MAX_GET_RETRIES = 9
    _MAX_POST_RETRIES = 3
    _MAX_BURST = 500
    _UTXO_LIST_LIMIT = 100

    def __init__(self, project, mainnet=False, preview=False, max_get_retries=_MAX_GET_RETRIES, max_post_retries=_MAX_POST_RETRIES):
        self.project = project
        self.mainnet = mainnet
        self.preview = preview
        self.built_up_burst = BlockfrostApi._MAX_BURST
        self.bursting = False
        self.curr_sec = int(time.time())
        self.curr_calls = 0
        self.max_get_retries = max_get_retries
        self.max_post_retries = max_post_retries

    def __account_for_rate_limit(self):
        this_time = time.time()
        this_sec = int(this_time)
        if self.curr_sec == this_sec:
            self.curr_calls += 1
        else:
            if self.bursting:
                self.built_up_burst = 0
            else:
                self.built_up_burst = min(BlockfrostApi._MAX_BURST, self.built_up_burst + BlockfrostApi._BURST_TXN_PER_SEC)
            self.bursting = False
            self.curr_sec = this_sec
            self.curr_calls = 1
        if self.curr_calls == BlockfrostApi._API_CALLS_PER_SEC:
            print("Blockfrost API: ENTERING BURST, EXCEEDED ALLOWABLE API CALLS")
            self.bursting = True
        if self.bursting and (self.curr_calls > self.built_up_burst):
            print("Blockfrost API: BEYOND BURST CAPABILITIES, MAY RESULT IN FATAL ERROR")
            time.sleep(1.0  / BlockfrostApi._API_CALLS_PER_SEC)

    def __get_api_base(self):
        identifier = 'mainnet' if self.mainnet else 'preview' if self.preview else 'preprod'
        return f"https://cardano-{identifier}.blockfrost.io/api/v0"

    def __call_with_retries(self, call_func, max_retries):
        self.__account_for_rate_limit()
        retries = 0
        while True:
            try:
                api_resp = call_func()
                print(f"{api_resp.url}: ({api_resp.status_code})")
                print(api_resp.text)
                api_resp.raise_for_status()
                return api_resp.json()
            except requests.exceptions.HTTPError as e:
                if retries < max_retries:
                    retries += 1
                    time.sleep(retries * BlockfrostApi._BACKOFF_SEC)
                else:
                    raise e

    def __call_get_api(self, resource):
        return self.__call_with_retries(
            lambda: requests.get(f"{self.__get_api_base()}/{resource}", headers={'project_id': self.project, 'Content-Type': BlockfrostApi._APPLICATION_JSON}),
            self.max_get_retries
        )

    def __call_paginated_get_api(self, resource):
        current_page = 0
        while True:
            current_page += 1
            arr_data = []
            try:
                arr_data = self.__call_get_api(f"{resource}?count={BlockfrostApi._UTXO_LIST_LIMIT}&page={current_page}")
            except requests.exceptions.HTTPError as e:
                if e.response.status_code != HTTPStatus.NOT_FOUND:
                    raise e
            yield arr_data
            if len(arr_data) < BlockfrostApi._UTXO_LIST_LIMIT:
                return

    def __call_post_api(self, content_type, resource, data):
        return self.__call_with_retries(
            lambda: requests.post(f"{self.__get_api_base()}/{resource}", headers={'project_id': self.project, 'Content-Type': content_type}, data=data),
            self.max_post_retries
        )

    def get_assets(self, policy_id):
        assets = []
        for assets_data in self.__call_paginated_get_api(f"assets/policy/{policy_id}"):
            assets += assets_data
        return assets

    def get_asset(self, asset_id):
        try:
            return self.__call_get_api(f"assets/{asset_id}")
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == HTTPStatus.NOT_FOUND:
                return None
            raise e

    def get_inputs(self, txn_hash):
        utxo_metadata = self.__call_get_api(f"txs/{txn_hash}/utxos")
        return utxo_metadata['inputs']

    def get_txn(self, txn_hash):
        try:
            return self.__call_get_api(f"txs/{txn_hash}")
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == HTTPStatus.NOT_FOUND:
                return None
            raise e

    def get_utxos(self, address, exclusions):
        available_utxos = set()
        for utxo_data in self.__call_paginated_get_api(f"addresses/{address}/utxos"):
            #print('EXCLUSIONS\t', [f'{utxo.hash}#{utxo.ix}' for utxo in exclusions])
            for raw_utxo in utxo_data:
                balances = [Utxo.Balance(int(balance['quantity']), balance['unit']) for balance in raw_utxo['amount']]
                utxo = Utxo(raw_utxo['tx_hash'], raw_utxo['output_index'], balances)
                if utxo in exclusions:
                    print(f'Skipping {utxo.hash}#{utxo.ix}')
                    continue
                available_utxos.add(utxo)
        return available_utxos

    def get_protocol_parameters(self):
        return self.__call_get_api('epochs/latest/parameters')

    def submit_txn(self, signed_file):
        with open(signed_file, 'r') as signed_filehandle:
            tx_cbor = json.load(signed_filehandle)['cborHex']
        return self.__call_post_api('application/cbor', '/tx/submit', bytes.fromhex(tx_cbor))

Classes

class BlockfrostApi (project, mainnet=False, preview=False, max_get_retries=9, max_post_retries=3)
Expand source code
class BlockfrostApi(object):

    PREPROD_MAGIC = '1'
    PREVIEW_MAGIC = '2'

    _API_CALLS_PER_SEC = 10
    _APPLICATION_JSON = 'application/json'
    _BACKOFF_SEC = 10
    _BURST_TXN_PER_SEC = 10
    _MAX_GET_RETRIES = 9
    _MAX_POST_RETRIES = 3
    _MAX_BURST = 500
    _UTXO_LIST_LIMIT = 100

    def __init__(self, project, mainnet=False, preview=False, max_get_retries=_MAX_GET_RETRIES, max_post_retries=_MAX_POST_RETRIES):
        self.project = project
        self.mainnet = mainnet
        self.preview = preview
        self.built_up_burst = BlockfrostApi._MAX_BURST
        self.bursting = False
        self.curr_sec = int(time.time())
        self.curr_calls = 0
        self.max_get_retries = max_get_retries
        self.max_post_retries = max_post_retries

    def __account_for_rate_limit(self):
        this_time = time.time()
        this_sec = int(this_time)
        if self.curr_sec == this_sec:
            self.curr_calls += 1
        else:
            if self.bursting:
                self.built_up_burst = 0
            else:
                self.built_up_burst = min(BlockfrostApi._MAX_BURST, self.built_up_burst + BlockfrostApi._BURST_TXN_PER_SEC)
            self.bursting = False
            self.curr_sec = this_sec
            self.curr_calls = 1
        if self.curr_calls == BlockfrostApi._API_CALLS_PER_SEC:
            print("Blockfrost API: ENTERING BURST, EXCEEDED ALLOWABLE API CALLS")
            self.bursting = True
        if self.bursting and (self.curr_calls > self.built_up_burst):
            print("Blockfrost API: BEYOND BURST CAPABILITIES, MAY RESULT IN FATAL ERROR")
            time.sleep(1.0  / BlockfrostApi._API_CALLS_PER_SEC)

    def __get_api_base(self):
        identifier = 'mainnet' if self.mainnet else 'preview' if self.preview else 'preprod'
        return f"https://cardano-{identifier}.blockfrost.io/api/v0"

    def __call_with_retries(self, call_func, max_retries):
        self.__account_for_rate_limit()
        retries = 0
        while True:
            try:
                api_resp = call_func()
                print(f"{api_resp.url}: ({api_resp.status_code})")
                print(api_resp.text)
                api_resp.raise_for_status()
                return api_resp.json()
            except requests.exceptions.HTTPError as e:
                if retries < max_retries:
                    retries += 1
                    time.sleep(retries * BlockfrostApi._BACKOFF_SEC)
                else:
                    raise e

    def __call_get_api(self, resource):
        return self.__call_with_retries(
            lambda: requests.get(f"{self.__get_api_base()}/{resource}", headers={'project_id': self.project, 'Content-Type': BlockfrostApi._APPLICATION_JSON}),
            self.max_get_retries
        )

    def __call_paginated_get_api(self, resource):
        current_page = 0
        while True:
            current_page += 1
            arr_data = []
            try:
                arr_data = self.__call_get_api(f"{resource}?count={BlockfrostApi._UTXO_LIST_LIMIT}&page={current_page}")
            except requests.exceptions.HTTPError as e:
                if e.response.status_code != HTTPStatus.NOT_FOUND:
                    raise e
            yield arr_data
            if len(arr_data) < BlockfrostApi._UTXO_LIST_LIMIT:
                return

    def __call_post_api(self, content_type, resource, data):
        return self.__call_with_retries(
            lambda: requests.post(f"{self.__get_api_base()}/{resource}", headers={'project_id': self.project, 'Content-Type': content_type}, data=data),
            self.max_post_retries
        )

    def get_assets(self, policy_id):
        assets = []
        for assets_data in self.__call_paginated_get_api(f"assets/policy/{policy_id}"):
            assets += assets_data
        return assets

    def get_asset(self, asset_id):
        try:
            return self.__call_get_api(f"assets/{asset_id}")
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == HTTPStatus.NOT_FOUND:
                return None
            raise e

    def get_inputs(self, txn_hash):
        utxo_metadata = self.__call_get_api(f"txs/{txn_hash}/utxos")
        return utxo_metadata['inputs']

    def get_txn(self, txn_hash):
        try:
            return self.__call_get_api(f"txs/{txn_hash}")
        except requests.exceptions.HTTPError as e:
            if e.response.status_code == HTTPStatus.NOT_FOUND:
                return None
            raise e

    def get_utxos(self, address, exclusions):
        available_utxos = set()
        for utxo_data in self.__call_paginated_get_api(f"addresses/{address}/utxos"):
            #print('EXCLUSIONS\t', [f'{utxo.hash}#{utxo.ix}' for utxo in exclusions])
            for raw_utxo in utxo_data:
                balances = [Utxo.Balance(int(balance['quantity']), balance['unit']) for balance in raw_utxo['amount']]
                utxo = Utxo(raw_utxo['tx_hash'], raw_utxo['output_index'], balances)
                if utxo in exclusions:
                    print(f'Skipping {utxo.hash}#{utxo.ix}')
                    continue
                available_utxos.add(utxo)
        return available_utxos

    def get_protocol_parameters(self):
        return self.__call_get_api('epochs/latest/parameters')

    def submit_txn(self, signed_file):
        with open(signed_file, 'r') as signed_filehandle:
            tx_cbor = json.load(signed_filehandle)['cborHex']
        return self.__call_post_api('application/cbor', '/tx/submit', bytes.fromhex(tx_cbor))

Class variables

var PREPROD_MAGIC
var PREVIEW_MAGIC

Methods

def get_asset(self, asset_id)
Expand source code
def get_asset(self, asset_id):
    try:
        return self.__call_get_api(f"assets/{asset_id}")
    except requests.exceptions.HTTPError as e:
        if e.response.status_code == HTTPStatus.NOT_FOUND:
            return None
        raise e
def get_assets(self, policy_id)
Expand source code
def get_assets(self, policy_id):
    assets = []
    for assets_data in self.__call_paginated_get_api(f"assets/policy/{policy_id}"):
        assets += assets_data
    return assets
def get_inputs(self, txn_hash)
Expand source code
def get_inputs(self, txn_hash):
    utxo_metadata = self.__call_get_api(f"txs/{txn_hash}/utxos")
    return utxo_metadata['inputs']
def get_protocol_parameters(self)
Expand source code
def get_protocol_parameters(self):
    return self.__call_get_api('epochs/latest/parameters')
def get_txn(self, txn_hash)
Expand source code
def get_txn(self, txn_hash):
    try:
        return self.__call_get_api(f"txs/{txn_hash}")
    except requests.exceptions.HTTPError as e:
        if e.response.status_code == HTTPStatus.NOT_FOUND:
            return None
        raise e
def get_utxos(self, address, exclusions)
Expand source code
def get_utxos(self, address, exclusions):
    available_utxos = set()
    for utxo_data in self.__call_paginated_get_api(f"addresses/{address}/utxos"):
        #print('EXCLUSIONS\t', [f'{utxo.hash}#{utxo.ix}' for utxo in exclusions])
        for raw_utxo in utxo_data:
            balances = [Utxo.Balance(int(balance['quantity']), balance['unit']) for balance in raw_utxo['amount']]
            utxo = Utxo(raw_utxo['tx_hash'], raw_utxo['output_index'], balances)
            if utxo in exclusions:
                print(f'Skipping {utxo.hash}#{utxo.ix}')
                continue
            available_utxos.add(utxo)
    return available_utxos
def submit_txn(self, signed_file)
Expand source code
def submit_txn(self, signed_file):
    with open(signed_file, 'r') as signed_filehandle:
        tx_cbor = json.load(signed_filehandle)['cborHex']
    return self.__call_post_api('application/cbor', '/tx/submit', bytes.fromhex(tx_cbor))