stocra.synchronous.client
1import logging 2from concurrent.futures import Executor, as_completed 3from decimal import Decimal 4from itertools import count 5from time import sleep 6from typing import Dict, Iterable, List, Optional, Tuple, Union, cast 7 8from requests import HTTPError, RequestException, Session 9 10from stocra.base_client import StocraBase 11from stocra.models import Block, ErrorHandler, StocraHTTPError, Token, Transaction 12 13logger = logging.getLogger("stocra") 14 15 16class Stocra(StocraBase): 17 _session: Session 18 _executor: Optional[Executor] 19 20 def __init__( 21 self, 22 api_key: Optional[str] = None, 23 session: Optional[Session] = None, 24 executor: Optional[Executor] = None, 25 error_handlers: Optional[List[ErrorHandler]] = None, 26 ): 27 super().__init__( 28 api_key=api_key, 29 error_handlers=error_handlers, 30 ) 31 self._session = session or Session() 32 self._executor = executor 33 34 def get_block(self, blockchain: str, hash_or_height: Union[str, int] = "latest") -> Block: 35 logger.debug("%s: get_block %s", blockchain, hash_or_height) 36 block_json = self._get(blockchain=blockchain, endpoint=f"blocks/{hash_or_height}") 37 return Block(**block_json) 38 39 def get_transaction(self, blockchain: str, transaction_hash: str) -> Transaction: 40 logger.debug("%s: get_transaction %s", blockchain, transaction_hash) 41 transaction_json = self._get(blockchain=blockchain, endpoint=f"transactions/{transaction_hash}") 42 return Transaction(**transaction_json) 43 44 def get_all_transactions_of_block(self, blockchain: str, block: Block) -> Iterable[Transaction]: 45 logger.debug("%s: get_all_transactions %s", blockchain, block.height) 46 if self._executor: 47 futures = [ 48 self._executor.submit(self.get_transaction, blockchain, transaction_hash) 49 for transaction_hash in block.transactions 50 ] 51 for transaction_task in as_completed(futures): 52 yield transaction_task.result() 53 else: 54 for transaction_hash in block.transactions: 55 yield self.get_transaction(blockchain, transaction_hash) 56 57 def stream_new_blocks( 58 self, 59 blockchain: str, 60 start_block_hash_or_height: Union[int, str] = "latest", 61 sleep_interval_seconds: float = 10, 62 ) -> Iterable[Block]: 63 block = self.get_block(blockchain=blockchain, hash_or_height=start_block_hash_or_height) 64 next_block_height = block.height + 1 65 yield block 66 67 while True: 68 try: 69 block = self.get_block(blockchain=blockchain, hash_or_height=next_block_height) 70 except HTTPError as exception: 71 if exception.response.status_code == 404: 72 self._handle_404_during_block_streaming(blockchain, next_block_height, sleep_interval_seconds) 73 continue 74 75 raise 76 77 next_block_height += 1 78 yield block 79 80 def stream_new_blocks_ahead( 81 self, 82 blockchain: str, 83 start_block_hash_or_height: Union[int, str] = "latest", 84 sleep_interval_seconds: float = 10, 85 n_blocks_ahead: int = 10, 86 ) -> Iterable[Block]: 87 if not self._executor: 88 raise Exception("Works only with executor") 89 90 if n_blocks_ahead < 1: 91 raise ValueError(f"`n_blocks_ahead` must be greater than 0. Got `{n_blocks_ahead}`") 92 93 block = self.get_block(blockchain=blockchain, hash_or_height=start_block_hash_or_height) 94 next_block_height = block.height + 1 95 last_block_height = next_block_height + n_blocks_ahead + 1 96 yield block 97 98 block_tasks = [ 99 self._executor.submit(self.get_block, blockchain, height) 100 for height in range(next_block_height, last_block_height) 101 ] 102 103 while True: 104 block_task = block_tasks.pop(0) 105 try: 106 yield block_task.result() 107 except HTTPError as exception: 108 if exception.response.status_code == 404: 109 self._handle_404_during_block_streaming(blockchain, next_block_height, sleep_interval_seconds) 110 block_tasks.insert(0, self._executor.submit(self.get_block, blockchain, next_block_height)) 111 continue 112 113 raise 114 115 block_tasks.append(self._executor.submit(self.get_block, blockchain, last_block_height)) 116 next_block_height += 1 117 last_block_height += 1 118 119 def stream_new_transactions( 120 self, 121 blockchain: str, 122 start_block_hash_or_height: Union[int, str] = "latest", 123 sleep_interval_seconds: float = 10, 124 load_n_blocks_ahead: Optional[int] = None, 125 ) -> Iterable[Tuple[Block, Transaction]]: 126 if load_n_blocks_ahead: 127 new_blocks = self.stream_new_blocks_ahead( 128 blockchain=blockchain, 129 start_block_hash_or_height=start_block_hash_or_height, 130 sleep_interval_seconds=sleep_interval_seconds, 131 n_blocks_ahead=load_n_blocks_ahead, 132 ) 133 else: 134 new_blocks = self.stream_new_blocks( 135 blockchain=blockchain, 136 start_block_hash_or_height=start_block_hash_or_height, 137 sleep_interval_seconds=sleep_interval_seconds, 138 ) 139 140 for block in new_blocks: 141 block_transactions = self.get_all_transactions_of_block(blockchain=blockchain, block=block) 142 for transaction in block_transactions: 143 yield block, transaction 144 145 def get_tokens(self, blockchain: str) -> Dict[str, Token]: 146 if self._tokens.get(blockchain) is None: 147 self._refresh_tokens(blockchain) 148 149 return self._tokens[blockchain] 150 151 def scale_token_value(self, blockchain: str, contract_address: str, value: Decimal) -> Decimal: 152 token = self.get_tokens(blockchain)[contract_address] 153 return value * token.scaling 154 155 def _get(self, blockchain: str, endpoint: str) -> dict: # type: ignore[return] 156 for iteration in count(start=1): 157 try: 158 response = self._session.get( 159 f"https://{blockchain}.stocra.com/v1.0/{endpoint}", 160 allow_redirects=False, 161 headers=self.headers, 162 ) 163 response.raise_for_status() 164 return cast(dict, response.json()) 165 except RequestException as exception: 166 error = StocraHTTPError(endpoint=endpoint, iteration=iteration, exception=exception) 167 if self._should_continue(error): 168 continue 169 170 raise 171 172 def _should_continue(self, error: StocraHTTPError) -> bool: 173 if not self._error_handlers: 174 return False 175 176 for error_handler in self._error_handlers: 177 retry = error_handler(error) 178 if retry: 179 return True 180 181 return False 182 183 @classmethod 184 def _handle_404_during_block_streaming( 185 cls, blockchain: str, block_height: int, sleep_interval_seconds: float 186 ) -> None: 187 logger.debug( 188 "%s: stream_new_blocks %s: 404, sleeping for %d seconds", 189 blockchain, 190 block_height, 191 sleep_interval_seconds, 192 ) 193 sleep(sleep_interval_seconds) 194 195 def _refresh_tokens(self, blockchain: str) -> None: 196 tokens = self._get(blockchain, "tokens") 197 self._tokens[blockchain] = {contract_address: Token(**token) for contract_address, token in tokens.items()}
17class Stocra(StocraBase): 18 _session: Session 19 _executor: Optional[Executor] 20 21 def __init__( 22 self, 23 api_key: Optional[str] = None, 24 session: Optional[Session] = None, 25 executor: Optional[Executor] = None, 26 error_handlers: Optional[List[ErrorHandler]] = None, 27 ): 28 super().__init__( 29 api_key=api_key, 30 error_handlers=error_handlers, 31 ) 32 self._session = session or Session() 33 self._executor = executor 34 35 def get_block(self, blockchain: str, hash_or_height: Union[str, int] = "latest") -> Block: 36 logger.debug("%s: get_block %s", blockchain, hash_or_height) 37 block_json = self._get(blockchain=blockchain, endpoint=f"blocks/{hash_or_height}") 38 return Block(**block_json) 39 40 def get_transaction(self, blockchain: str, transaction_hash: str) -> Transaction: 41 logger.debug("%s: get_transaction %s", blockchain, transaction_hash) 42 transaction_json = self._get(blockchain=blockchain, endpoint=f"transactions/{transaction_hash}") 43 return Transaction(**transaction_json) 44 45 def get_all_transactions_of_block(self, blockchain: str, block: Block) -> Iterable[Transaction]: 46 logger.debug("%s: get_all_transactions %s", blockchain, block.height) 47 if self._executor: 48 futures = [ 49 self._executor.submit(self.get_transaction, blockchain, transaction_hash) 50 for transaction_hash in block.transactions 51 ] 52 for transaction_task in as_completed(futures): 53 yield transaction_task.result() 54 else: 55 for transaction_hash in block.transactions: 56 yield self.get_transaction(blockchain, transaction_hash) 57 58 def stream_new_blocks( 59 self, 60 blockchain: str, 61 start_block_hash_or_height: Union[int, str] = "latest", 62 sleep_interval_seconds: float = 10, 63 ) -> Iterable[Block]: 64 block = self.get_block(blockchain=blockchain, hash_or_height=start_block_hash_or_height) 65 next_block_height = block.height + 1 66 yield block 67 68 while True: 69 try: 70 block = self.get_block(blockchain=blockchain, hash_or_height=next_block_height) 71 except HTTPError as exception: 72 if exception.response.status_code == 404: 73 self._handle_404_during_block_streaming(blockchain, next_block_height, sleep_interval_seconds) 74 continue 75 76 raise 77 78 next_block_height += 1 79 yield block 80 81 def stream_new_blocks_ahead( 82 self, 83 blockchain: str, 84 start_block_hash_or_height: Union[int, str] = "latest", 85 sleep_interval_seconds: float = 10, 86 n_blocks_ahead: int = 10, 87 ) -> Iterable[Block]: 88 if not self._executor: 89 raise Exception("Works only with executor") 90 91 if n_blocks_ahead < 1: 92 raise ValueError(f"`n_blocks_ahead` must be greater than 0. Got `{n_blocks_ahead}`") 93 94 block = self.get_block(blockchain=blockchain, hash_or_height=start_block_hash_or_height) 95 next_block_height = block.height + 1 96 last_block_height = next_block_height + n_blocks_ahead + 1 97 yield block 98 99 block_tasks = [ 100 self._executor.submit(self.get_block, blockchain, height) 101 for height in range(next_block_height, last_block_height) 102 ] 103 104 while True: 105 block_task = block_tasks.pop(0) 106 try: 107 yield block_task.result() 108 except HTTPError as exception: 109 if exception.response.status_code == 404: 110 self._handle_404_during_block_streaming(blockchain, next_block_height, sleep_interval_seconds) 111 block_tasks.insert(0, self._executor.submit(self.get_block, blockchain, next_block_height)) 112 continue 113 114 raise 115 116 block_tasks.append(self._executor.submit(self.get_block, blockchain, last_block_height)) 117 next_block_height += 1 118 last_block_height += 1 119 120 def stream_new_transactions( 121 self, 122 blockchain: str, 123 start_block_hash_or_height: Union[int, str] = "latest", 124 sleep_interval_seconds: float = 10, 125 load_n_blocks_ahead: Optional[int] = None, 126 ) -> Iterable[Tuple[Block, Transaction]]: 127 if load_n_blocks_ahead: 128 new_blocks = self.stream_new_blocks_ahead( 129 blockchain=blockchain, 130 start_block_hash_or_height=start_block_hash_or_height, 131 sleep_interval_seconds=sleep_interval_seconds, 132 n_blocks_ahead=load_n_blocks_ahead, 133 ) 134 else: 135 new_blocks = self.stream_new_blocks( 136 blockchain=blockchain, 137 start_block_hash_or_height=start_block_hash_or_height, 138 sleep_interval_seconds=sleep_interval_seconds, 139 ) 140 141 for block in new_blocks: 142 block_transactions = self.get_all_transactions_of_block(blockchain=blockchain, block=block) 143 for transaction in block_transactions: 144 yield block, transaction 145 146 def get_tokens(self, blockchain: str) -> Dict[str, Token]: 147 if self._tokens.get(blockchain) is None: 148 self._refresh_tokens(blockchain) 149 150 return self._tokens[blockchain] 151 152 def scale_token_value(self, blockchain: str, contract_address: str, value: Decimal) -> Decimal: 153 token = self.get_tokens(blockchain)[contract_address] 154 return value * token.scaling 155 156 def _get(self, blockchain: str, endpoint: str) -> dict: # type: ignore[return] 157 for iteration in count(start=1): 158 try: 159 response = self._session.get( 160 f"https://{blockchain}.stocra.com/v1.0/{endpoint}", 161 allow_redirects=False, 162 headers=self.headers, 163 ) 164 response.raise_for_status() 165 return cast(dict, response.json()) 166 except RequestException as exception: 167 error = StocraHTTPError(endpoint=endpoint, iteration=iteration, exception=exception) 168 if self._should_continue(error): 169 continue 170 171 raise 172 173 def _should_continue(self, error: StocraHTTPError) -> bool: 174 if not self._error_handlers: 175 return False 176 177 for error_handler in self._error_handlers: 178 retry = error_handler(error) 179 if retry: 180 return True 181 182 return False 183 184 @classmethod 185 def _handle_404_during_block_streaming( 186 cls, blockchain: str, block_height: int, sleep_interval_seconds: float 187 ) -> None: 188 logger.debug( 189 "%s: stream_new_blocks %s: 404, sleeping for %d seconds", 190 blockchain, 191 block_height, 192 sleep_interval_seconds, 193 ) 194 sleep(sleep_interval_seconds) 195 196 def _refresh_tokens(self, blockchain: str) -> None: 197 tokens = self._get(blockchain, "tokens") 198 self._tokens[blockchain] = {contract_address: Token(**token) for contract_address, token in tokens.items()}
Helper class that provides a standard way to create an ABC using inheritance.
Stocra( api_key: Optional[str] = None, session: Optional[requests.sessions.Session] = None, executor: Optional[concurrent.futures._base.Executor] = None, error_handlers: Optional[List[Callable[[stocra.models.StocraHTTPError], Union[bool, Awaitable[bool]]]]] = None)
21 def __init__( 22 self, 23 api_key: Optional[str] = None, 24 session: Optional[Session] = None, 25 executor: Optional[Executor] = None, 26 error_handlers: Optional[List[ErrorHandler]] = None, 27 ): 28 super().__init__( 29 api_key=api_key, 30 error_handlers=error_handlers, 31 ) 32 self._session = session or Session() 33 self._executor = executor
def
get_block( self, blockchain: str, hash_or_height: Union[str, int] = 'latest') -> stocra.models.Block:
def
get_all_transactions_of_block( self, blockchain: str, block: stocra.models.Block) -> Iterable[stocra.models.Transaction]:
45 def get_all_transactions_of_block(self, blockchain: str, block: Block) -> Iterable[Transaction]: 46 logger.debug("%s: get_all_transactions %s", blockchain, block.height) 47 if self._executor: 48 futures = [ 49 self._executor.submit(self.get_transaction, blockchain, transaction_hash) 50 for transaction_hash in block.transactions 51 ] 52 for transaction_task in as_completed(futures): 53 yield transaction_task.result() 54 else: 55 for transaction_hash in block.transactions: 56 yield self.get_transaction(blockchain, transaction_hash)
def
stream_new_blocks( self, blockchain: str, start_block_hash_or_height: Union[int, str] = 'latest', sleep_interval_seconds: float = 10) -> Iterable[stocra.models.Block]:
58 def stream_new_blocks( 59 self, 60 blockchain: str, 61 start_block_hash_or_height: Union[int, str] = "latest", 62 sleep_interval_seconds: float = 10, 63 ) -> Iterable[Block]: 64 block = self.get_block(blockchain=blockchain, hash_or_height=start_block_hash_or_height) 65 next_block_height = block.height + 1 66 yield block 67 68 while True: 69 try: 70 block = self.get_block(blockchain=blockchain, hash_or_height=next_block_height) 71 except HTTPError as exception: 72 if exception.response.status_code == 404: 73 self._handle_404_during_block_streaming(blockchain, next_block_height, sleep_interval_seconds) 74 continue 75 76 raise 77 78 next_block_height += 1 79 yield block
def
stream_new_blocks_ahead( self, blockchain: str, start_block_hash_or_height: Union[int, str] = 'latest', sleep_interval_seconds: float = 10, n_blocks_ahead: int = 10) -> Iterable[stocra.models.Block]:
81 def stream_new_blocks_ahead( 82 self, 83 blockchain: str, 84 start_block_hash_or_height: Union[int, str] = "latest", 85 sleep_interval_seconds: float = 10, 86 n_blocks_ahead: int = 10, 87 ) -> Iterable[Block]: 88 if not self._executor: 89 raise Exception("Works only with executor") 90 91 if n_blocks_ahead < 1: 92 raise ValueError(f"`n_blocks_ahead` must be greater than 0. Got `{n_blocks_ahead}`") 93 94 block = self.get_block(blockchain=blockchain, hash_or_height=start_block_hash_or_height) 95 next_block_height = block.height + 1 96 last_block_height = next_block_height + n_blocks_ahead + 1 97 yield block 98 99 block_tasks = [ 100 self._executor.submit(self.get_block, blockchain, height) 101 for height in range(next_block_height, last_block_height) 102 ] 103 104 while True: 105 block_task = block_tasks.pop(0) 106 try: 107 yield block_task.result() 108 except HTTPError as exception: 109 if exception.response.status_code == 404: 110 self._handle_404_during_block_streaming(blockchain, next_block_height, sleep_interval_seconds) 111 block_tasks.insert(0, self._executor.submit(self.get_block, blockchain, next_block_height)) 112 continue 113 114 raise 115 116 block_tasks.append(self._executor.submit(self.get_block, blockchain, last_block_height)) 117 next_block_height += 1 118 last_block_height += 1
def
stream_new_transactions( self, blockchain: str, start_block_hash_or_height: Union[int, str] = 'latest', sleep_interval_seconds: float = 10, load_n_blocks_ahead: Optional[int] = None) -> Iterable[Tuple[stocra.models.Block, stocra.models.Transaction]]:
120 def stream_new_transactions( 121 self, 122 blockchain: str, 123 start_block_hash_or_height: Union[int, str] = "latest", 124 sleep_interval_seconds: float = 10, 125 load_n_blocks_ahead: Optional[int] = None, 126 ) -> Iterable[Tuple[Block, Transaction]]: 127 if load_n_blocks_ahead: 128 new_blocks = self.stream_new_blocks_ahead( 129 blockchain=blockchain, 130 start_block_hash_or_height=start_block_hash_or_height, 131 sleep_interval_seconds=sleep_interval_seconds, 132 n_blocks_ahead=load_n_blocks_ahead, 133 ) 134 else: 135 new_blocks = self.stream_new_blocks( 136 blockchain=blockchain, 137 start_block_hash_or_height=start_block_hash_or_height, 138 sleep_interval_seconds=sleep_interval_seconds, 139 ) 140 141 for block in new_blocks: 142 block_transactions = self.get_all_transactions_of_block(blockchain=blockchain, block=block) 143 for transaction in block_transactions: 144 yield block, transaction
def
scale_token_value( self, blockchain: str, contract_address: str, value: decimal.Decimal) -> decimal.Decimal: