stocra.asynchronous.client
1import asyncio 2import logging 3from asyncio import Semaphore 4from contextlib import asynccontextmanager 5from decimal import Decimal 6from itertools import count 7from typing import ( 8 AsyncGenerator, 9 AsyncIterable, 10 Awaitable, 11 Dict, 12 List, 13 Optional, 14 Tuple, 15 Union, 16 cast, 17) 18 19from aiohttp import ClientError, ClientResponseError, ClientSession 20 21from stocra.base_client import StocraBase 22from stocra.models import Block, ErrorHandler, StocraHTTPError, Token, Transaction 23 24logger = logging.getLogger("stocra") 25 26 27class Stocra(StocraBase): 28 _session: ClientSession 29 _semaphore: Optional[Semaphore] 30 _error_handlers: List[ErrorHandler] 31 32 def __init__( 33 self, 34 api_key: Optional[str] = None, 35 session: Optional[ClientSession] = None, 36 semaphore: Optional[Semaphore] = None, 37 error_handlers: Optional[List[ErrorHandler]] = None, 38 ): 39 super().__init__( 40 api_key=api_key, 41 error_handlers=error_handlers, 42 ) 43 44 self._session = session or ClientSession() 45 self._semaphore = semaphore 46 47 async def close(self) -> None: 48 await self._session.close() 49 50 async def get_block(self, blockchain: str, hash_or_height: Union[str, int] = "latest") -> Block: 51 logger.debug("%s: get_block %s", blockchain, hash_or_height) 52 async with self._with_semaphore(): 53 block_json = await self._get(blockchain=blockchain, endpoint=f"blocks/{hash_or_height}") 54 return Block(**block_json) 55 56 async def get_transaction(self, blockchain: str, transaction_hash: str) -> Transaction: 57 logger.debug("%s: get_transaction %s", blockchain, transaction_hash) 58 async with self._with_semaphore(): 59 transaction_json = await self._get(blockchain=blockchain, endpoint=f"transactions/{transaction_hash}") 60 return Transaction(**transaction_json) 61 62 async def get_all_transactions_of_block(self, blockchain: str, block: Block) -> AsyncIterable[Transaction]: 63 logger.debug("%s: get_all_transactions %s", blockchain, block.height) 64 transaction_tasks = [] 65 66 for transaction_hash in block.transactions: 67 task = asyncio.create_task(self.get_transaction(blockchain=blockchain, transaction_hash=transaction_hash)) 68 transaction_tasks.append(task) 69 70 for completed_task in asyncio.as_completed(transaction_tasks): 71 yield await completed_task 72 73 async def stream_new_blocks( 74 self, 75 blockchain: str, 76 start_block_hash_or_height: Union[int, str] = "latest", 77 sleep_interval_seconds: float = 10, 78 n_blocks_ahead: int = 1, 79 ) -> AsyncIterable[Block]: 80 if n_blocks_ahead < 1: 81 raise ValueError(f"`n_blocks_ahead` must be greater than 0. Got `{n_blocks_ahead}`") 82 83 block = await self.get_block(blockchain=blockchain, hash_or_height=start_block_hash_or_height) 84 first_block_to_load_height = block.height + 1 85 last_block_to_load_height = first_block_to_load_height + n_blocks_ahead + 1 86 yield block 87 88 block_tasks = [ 89 asyncio.create_task(self.get_block(blockchain, height)) 90 for height in range(first_block_to_load_height, last_block_to_load_height) 91 ] 92 93 while True: 94 block_task = block_tasks.pop(0) 95 try: 96 await asyncio.wait_for(block_task, timeout=None) 97 yield block_task.result() 98 except ClientResponseError as exception: 99 if exception.status == 404: 100 logger.debug( 101 "%s: stream_new_blocks_ahead %s: 404, sleeping for %d seconds", 102 blockchain, 103 first_block_to_load_height, 104 sleep_interval_seconds, 105 ) 106 await asyncio.sleep(sleep_interval_seconds) 107 block_tasks.insert(0, asyncio.create_task(self.get_block(blockchain, first_block_to_load_height))) 108 continue 109 110 raise 111 112 block_tasks.append(asyncio.create_task(self.get_block(blockchain, last_block_to_load_height))) 113 first_block_to_load_height += 1 114 last_block_to_load_height += 1 115 116 async def stream_new_transactions( 117 self, 118 blockchain: str, 119 start_block_hash_or_height: Union[int, str] = "latest", 120 sleep_interval_seconds: float = 10, 121 load_n_blocks_ahead: int = 1, 122 ) -> AsyncIterable[Tuple[Block, Transaction]]: 123 new_blocks = self.stream_new_blocks( 124 blockchain=blockchain, 125 start_block_hash_or_height=start_block_hash_or_height, 126 sleep_interval_seconds=sleep_interval_seconds, 127 n_blocks_ahead=load_n_blocks_ahead, 128 ) 129 async for block in new_blocks: 130 block_transactions = self.get_all_transactions_of_block(blockchain=blockchain, block=block) 131 async for transaction in block_transactions: 132 yield block, transaction 133 134 async def get_tokens(self, blockchain: str) -> Dict[str, Token]: 135 if self._tokens.get(blockchain) is None: 136 await self._refresh_tokens(blockchain) 137 138 return self._tokens[blockchain] 139 140 async def scale_token_value(self, blockchain: str, contract_address: str, value: Decimal) -> Decimal: 141 tokens = await self.get_tokens(blockchain) 142 token = tokens[contract_address] 143 return value * token.scaling 144 145 async def _acquire(self) -> None: 146 if self._semaphore: 147 await self._semaphore.acquire() 148 149 def _release(self) -> None: 150 if self._semaphore: 151 self._semaphore.release() 152 153 @asynccontextmanager 154 async def _with_semaphore(self) -> AsyncGenerator[None, None]: 155 await self._acquire() 156 try: 157 yield 158 finally: 159 self._release() 160 161 async def _get(self, blockchain: str, endpoint: str) -> dict: # type: ignore[return] 162 for iteration in count(start=1): 163 try: 164 response = await self._session.get( 165 f"https://{blockchain}.stocra.com/v1.0/{endpoint}", 166 raise_for_status=True, 167 allow_redirects=False, 168 headers=self.headers, 169 ) 170 return cast(dict, await response.json()) 171 except (ClientError, asyncio.TimeoutError) as exception: 172 error = StocraHTTPError(endpoint=endpoint, iteration=iteration, exception=exception) 173 if await self._should_continue(error): 174 continue 175 176 raise 177 178 async def _should_continue(self, error: StocraHTTPError) -> bool: 179 if not self._error_handlers: 180 return False 181 182 for error_handler in self._error_handlers: 183 retry = await cast(Awaitable[bool], error_handler(error)) 184 if retry: 185 return True 186 187 return False 188 189 async def _refresh_tokens(self, blockchain: str) -> None: 190 tokens = await self._get(blockchain, "tokens") 191 self._tokens[blockchain] = {contract_address: Token(**token) for contract_address, token in tokens.items()}
28class Stocra(StocraBase): 29 _session: ClientSession 30 _semaphore: Optional[Semaphore] 31 _error_handlers: List[ErrorHandler] 32 33 def __init__( 34 self, 35 api_key: Optional[str] = None, 36 session: Optional[ClientSession] = None, 37 semaphore: Optional[Semaphore] = None, 38 error_handlers: Optional[List[ErrorHandler]] = None, 39 ): 40 super().__init__( 41 api_key=api_key, 42 error_handlers=error_handlers, 43 ) 44 45 self._session = session or ClientSession() 46 self._semaphore = semaphore 47 48 async def close(self) -> None: 49 await self._session.close() 50 51 async def get_block(self, blockchain: str, hash_or_height: Union[str, int] = "latest") -> Block: 52 logger.debug("%s: get_block %s", blockchain, hash_or_height) 53 async with self._with_semaphore(): 54 block_json = await self._get(blockchain=blockchain, endpoint=f"blocks/{hash_or_height}") 55 return Block(**block_json) 56 57 async def get_transaction(self, blockchain: str, transaction_hash: str) -> Transaction: 58 logger.debug("%s: get_transaction %s", blockchain, transaction_hash) 59 async with self._with_semaphore(): 60 transaction_json = await self._get(blockchain=blockchain, endpoint=f"transactions/{transaction_hash}") 61 return Transaction(**transaction_json) 62 63 async def get_all_transactions_of_block(self, blockchain: str, block: Block) -> AsyncIterable[Transaction]: 64 logger.debug("%s: get_all_transactions %s", blockchain, block.height) 65 transaction_tasks = [] 66 67 for transaction_hash in block.transactions: 68 task = asyncio.create_task(self.get_transaction(blockchain=blockchain, transaction_hash=transaction_hash)) 69 transaction_tasks.append(task) 70 71 for completed_task in asyncio.as_completed(transaction_tasks): 72 yield await completed_task 73 74 async def stream_new_blocks( 75 self, 76 blockchain: str, 77 start_block_hash_or_height: Union[int, str] = "latest", 78 sleep_interval_seconds: float = 10, 79 n_blocks_ahead: int = 1, 80 ) -> AsyncIterable[Block]: 81 if n_blocks_ahead < 1: 82 raise ValueError(f"`n_blocks_ahead` must be greater than 0. Got `{n_blocks_ahead}`") 83 84 block = await self.get_block(blockchain=blockchain, hash_or_height=start_block_hash_or_height) 85 first_block_to_load_height = block.height + 1 86 last_block_to_load_height = first_block_to_load_height + n_blocks_ahead + 1 87 yield block 88 89 block_tasks = [ 90 asyncio.create_task(self.get_block(blockchain, height)) 91 for height in range(first_block_to_load_height, last_block_to_load_height) 92 ] 93 94 while True: 95 block_task = block_tasks.pop(0) 96 try: 97 await asyncio.wait_for(block_task, timeout=None) 98 yield block_task.result() 99 except ClientResponseError as exception: 100 if exception.status == 404: 101 logger.debug( 102 "%s: stream_new_blocks_ahead %s: 404, sleeping for %d seconds", 103 blockchain, 104 first_block_to_load_height, 105 sleep_interval_seconds, 106 ) 107 await asyncio.sleep(sleep_interval_seconds) 108 block_tasks.insert(0, asyncio.create_task(self.get_block(blockchain, first_block_to_load_height))) 109 continue 110 111 raise 112 113 block_tasks.append(asyncio.create_task(self.get_block(blockchain, last_block_to_load_height))) 114 first_block_to_load_height += 1 115 last_block_to_load_height += 1 116 117 async def stream_new_transactions( 118 self, 119 blockchain: str, 120 start_block_hash_or_height: Union[int, str] = "latest", 121 sleep_interval_seconds: float = 10, 122 load_n_blocks_ahead: int = 1, 123 ) -> AsyncIterable[Tuple[Block, Transaction]]: 124 new_blocks = self.stream_new_blocks( 125 blockchain=blockchain, 126 start_block_hash_or_height=start_block_hash_or_height, 127 sleep_interval_seconds=sleep_interval_seconds, 128 n_blocks_ahead=load_n_blocks_ahead, 129 ) 130 async for block in new_blocks: 131 block_transactions = self.get_all_transactions_of_block(blockchain=blockchain, block=block) 132 async for transaction in block_transactions: 133 yield block, transaction 134 135 async def get_tokens(self, blockchain: str) -> Dict[str, Token]: 136 if self._tokens.get(blockchain) is None: 137 await self._refresh_tokens(blockchain) 138 139 return self._tokens[blockchain] 140 141 async def scale_token_value(self, blockchain: str, contract_address: str, value: Decimal) -> Decimal: 142 tokens = await self.get_tokens(blockchain) 143 token = tokens[contract_address] 144 return value * token.scaling 145 146 async def _acquire(self) -> None: 147 if self._semaphore: 148 await self._semaphore.acquire() 149 150 def _release(self) -> None: 151 if self._semaphore: 152 self._semaphore.release() 153 154 @asynccontextmanager 155 async def _with_semaphore(self) -> AsyncGenerator[None, None]: 156 await self._acquire() 157 try: 158 yield 159 finally: 160 self._release() 161 162 async def _get(self, blockchain: str, endpoint: str) -> dict: # type: ignore[return] 163 for iteration in count(start=1): 164 try: 165 response = await self._session.get( 166 f"https://{blockchain}.stocra.com/v1.0/{endpoint}", 167 raise_for_status=True, 168 allow_redirects=False, 169 headers=self.headers, 170 ) 171 return cast(dict, await response.json()) 172 except (ClientError, asyncio.TimeoutError) as exception: 173 error = StocraHTTPError(endpoint=endpoint, iteration=iteration, exception=exception) 174 if await self._should_continue(error): 175 continue 176 177 raise 178 179 async def _should_continue(self, error: StocraHTTPError) -> bool: 180 if not self._error_handlers: 181 return False 182 183 for error_handler in self._error_handlers: 184 retry = await cast(Awaitable[bool], error_handler(error)) 185 if retry: 186 return True 187 188 return False 189 190 async def _refresh_tokens(self, blockchain: str) -> None: 191 tokens = await self._get(blockchain, "tokens") 192 self._tokens[blockchain] = {contract_address: Token(**token) for contract_address, token in tokens.items()}
Helper class that provides a standard way to create an ABC using inheritance.
Stocra( api_key: Optional[str] = None, session: Optional[aiohttp.client.ClientSession] = None, semaphore: Optional[asyncio.locks.Semaphore] = None, error_handlers: Optional[List[Callable[[stocra.models.StocraHTTPError], Union[bool, Awaitable[bool]]]]] = None)
33 def __init__( 34 self, 35 api_key: Optional[str] = None, 36 session: Optional[ClientSession] = None, 37 semaphore: Optional[Semaphore] = None, 38 error_handlers: Optional[List[ErrorHandler]] = None, 39 ): 40 super().__init__( 41 api_key=api_key, 42 error_handlers=error_handlers, 43 ) 44 45 self._session = session or ClientSession() 46 self._semaphore = semaphore
async def
get_block( self, blockchain: str, hash_or_height: Union[str, int] = 'latest') -> stocra.models.Block:
51 async def get_block(self, blockchain: str, hash_or_height: Union[str, int] = "latest") -> Block: 52 logger.debug("%s: get_block %s", blockchain, hash_or_height) 53 async with self._with_semaphore(): 54 block_json = await self._get(blockchain=blockchain, endpoint=f"blocks/{hash_or_height}") 55 return Block(**block_json)
async def
get_transaction( self, blockchain: str, transaction_hash: str) -> stocra.models.Transaction:
57 async def get_transaction(self, blockchain: str, transaction_hash: str) -> Transaction: 58 logger.debug("%s: get_transaction %s", blockchain, transaction_hash) 59 async with self._with_semaphore(): 60 transaction_json = await self._get(blockchain=blockchain, endpoint=f"transactions/{transaction_hash}") 61 return Transaction(**transaction_json)
async def
get_all_transactions_of_block( self, blockchain: str, block: stocra.models.Block) -> AsyncIterable[stocra.models.Transaction]:
63 async def get_all_transactions_of_block(self, blockchain: str, block: Block) -> AsyncIterable[Transaction]: 64 logger.debug("%s: get_all_transactions %s", blockchain, block.height) 65 transaction_tasks = [] 66 67 for transaction_hash in block.transactions: 68 task = asyncio.create_task(self.get_transaction(blockchain=blockchain, transaction_hash=transaction_hash)) 69 transaction_tasks.append(task) 70 71 for completed_task in asyncio.as_completed(transaction_tasks): 72 yield await completed_task
async def
stream_new_blocks( self, blockchain: str, start_block_hash_or_height: Union[int, str] = 'latest', sleep_interval_seconds: float = 10, n_blocks_ahead: int = 1) -> AsyncIterable[stocra.models.Block]:
74 async def stream_new_blocks( 75 self, 76 blockchain: str, 77 start_block_hash_or_height: Union[int, str] = "latest", 78 sleep_interval_seconds: float = 10, 79 n_blocks_ahead: int = 1, 80 ) -> AsyncIterable[Block]: 81 if n_blocks_ahead < 1: 82 raise ValueError(f"`n_blocks_ahead` must be greater than 0. Got `{n_blocks_ahead}`") 83 84 block = await self.get_block(blockchain=blockchain, hash_or_height=start_block_hash_or_height) 85 first_block_to_load_height = block.height + 1 86 last_block_to_load_height = first_block_to_load_height + n_blocks_ahead + 1 87 yield block 88 89 block_tasks = [ 90 asyncio.create_task(self.get_block(blockchain, height)) 91 for height in range(first_block_to_load_height, last_block_to_load_height) 92 ] 93 94 while True: 95 block_task = block_tasks.pop(0) 96 try: 97 await asyncio.wait_for(block_task, timeout=None) 98 yield block_task.result() 99 except ClientResponseError as exception: 100 if exception.status == 404: 101 logger.debug( 102 "%s: stream_new_blocks_ahead %s: 404, sleeping for %d seconds", 103 blockchain, 104 first_block_to_load_height, 105 sleep_interval_seconds, 106 ) 107 await asyncio.sleep(sleep_interval_seconds) 108 block_tasks.insert(0, asyncio.create_task(self.get_block(blockchain, first_block_to_load_height))) 109 continue 110 111 raise 112 113 block_tasks.append(asyncio.create_task(self.get_block(blockchain, last_block_to_load_height))) 114 first_block_to_load_height += 1 115 last_block_to_load_height += 1
async def
stream_new_transactions( self, blockchain: str, start_block_hash_or_height: Union[int, str] = 'latest', sleep_interval_seconds: float = 10, load_n_blocks_ahead: int = 1) -> AsyncIterable[Tuple[stocra.models.Block, stocra.models.Transaction]]:
117 async def stream_new_transactions( 118 self, 119 blockchain: str, 120 start_block_hash_or_height: Union[int, str] = "latest", 121 sleep_interval_seconds: float = 10, 122 load_n_blocks_ahead: int = 1, 123 ) -> AsyncIterable[Tuple[Block, Transaction]]: 124 new_blocks = self.stream_new_blocks( 125 blockchain=blockchain, 126 start_block_hash_or_height=start_block_hash_or_height, 127 sleep_interval_seconds=sleep_interval_seconds, 128 n_blocks_ahead=load_n_blocks_ahead, 129 ) 130 async for block in new_blocks: 131 block_transactions = self.get_all_transactions_of_block(blockchain=blockchain, block=block) 132 async for transaction in block_transactions: 133 yield block, transaction
async def
scale_token_value( self, blockchain: str, contract_address: str, value: decimal.Decimal) -> decimal.Decimal: