stocra.asynchronous.client

  1import asyncio
  2import logging
  3from asyncio import Semaphore
  4from contextlib import asynccontextmanager
  5from decimal import Decimal
  6from itertools import count
  7from typing import (
  8    AsyncGenerator,
  9    AsyncIterable,
 10    Awaitable,
 11    Dict,
 12    List,
 13    Optional,
 14    Tuple,
 15    Union,
 16    cast,
 17)
 18
 19from aiohttp import ClientError, ClientResponseError, ClientSession
 20
 21from stocra.base_client import StocraBase
 22from stocra.models import Block, ErrorHandler, StocraHTTPError, Token, Transaction
 23
 24logger = logging.getLogger("stocra")
 25
 26
 27class Stocra(StocraBase):
 28    _session: ClientSession
 29    _semaphore: Optional[Semaphore]
 30    _error_handlers: List[ErrorHandler]
 31
 32    def __init__(
 33        self,
 34        api_key: Optional[str] = None,
 35        session: Optional[ClientSession] = None,
 36        semaphore: Optional[Semaphore] = None,
 37        error_handlers: Optional[List[ErrorHandler]] = None,
 38    ):
 39        super().__init__(
 40            api_key=api_key,
 41            error_handlers=error_handlers,
 42        )
 43
 44        self._session = session or ClientSession()
 45        self._semaphore = semaphore
 46
 47    async def close(self) -> None:
 48        await self._session.close()
 49
 50    async def get_block(self, blockchain: str, hash_or_height: Union[str, int] = "latest") -> Block:
 51        logger.debug("%s: get_block %s", blockchain, hash_or_height)
 52        async with self._with_semaphore():
 53            block_json = await self._get(blockchain=blockchain, endpoint=f"blocks/{hash_or_height}")
 54            return Block(**block_json)
 55
 56    async def get_transaction(self, blockchain: str, transaction_hash: str) -> Transaction:
 57        logger.debug("%s: get_transaction %s", blockchain, transaction_hash)
 58        async with self._with_semaphore():
 59            transaction_json = await self._get(blockchain=blockchain, endpoint=f"transactions/{transaction_hash}")
 60            return Transaction(**transaction_json)
 61
 62    async def get_all_transactions_of_block(self, blockchain: str, block: Block) -> AsyncIterable[Transaction]:
 63        logger.debug("%s: get_all_transactions %s", blockchain, block.height)
 64        transaction_tasks = []
 65
 66        for transaction_hash in block.transactions:
 67            task = asyncio.create_task(self.get_transaction(blockchain=blockchain, transaction_hash=transaction_hash))
 68            transaction_tasks.append(task)
 69
 70        for completed_task in asyncio.as_completed(transaction_tasks):
 71            yield await completed_task
 72
 73    async def stream_new_blocks(
 74        self,
 75        blockchain: str,
 76        start_block_hash_or_height: Union[int, str] = "latest",
 77        sleep_interval_seconds: float = 10,
 78        n_blocks_ahead: int = 1,
 79    ) -> AsyncIterable[Block]:
 80        if n_blocks_ahead < 1:
 81            raise ValueError(f"`n_blocks_ahead` must be greater than 0. Got `{n_blocks_ahead}`")
 82
 83        block = await self.get_block(blockchain=blockchain, hash_or_height=start_block_hash_or_height)
 84        first_block_to_load_height = block.height + 1
 85        last_block_to_load_height = first_block_to_load_height + n_blocks_ahead + 1
 86        yield block
 87
 88        block_tasks = [
 89            asyncio.create_task(self.get_block(blockchain, height))
 90            for height in range(first_block_to_load_height, last_block_to_load_height)
 91        ]
 92
 93        while True:
 94            block_task = block_tasks.pop(0)
 95            try:
 96                await asyncio.wait_for(block_task, timeout=None)
 97                yield block_task.result()
 98            except ClientResponseError as exception:
 99                if exception.status == 404:
100                    logger.debug(
101                        "%s: stream_new_blocks_ahead %s: 404, sleeping for %d seconds",
102                        blockchain,
103                        first_block_to_load_height,
104                        sleep_interval_seconds,
105                    )
106                    await asyncio.sleep(sleep_interval_seconds)
107                    block_tasks.insert(0, asyncio.create_task(self.get_block(blockchain, first_block_to_load_height)))
108                    continue
109
110                raise
111
112            block_tasks.append(asyncio.create_task(self.get_block(blockchain, last_block_to_load_height)))
113            first_block_to_load_height += 1
114            last_block_to_load_height += 1
115
116    async def stream_new_transactions(
117        self,
118        blockchain: str,
119        start_block_hash_or_height: Union[int, str] = "latest",
120        sleep_interval_seconds: float = 10,
121        load_n_blocks_ahead: int = 1,
122    ) -> AsyncIterable[Tuple[Block, Transaction]]:
123        new_blocks = self.stream_new_blocks(
124            blockchain=blockchain,
125            start_block_hash_or_height=start_block_hash_or_height,
126            sleep_interval_seconds=sleep_interval_seconds,
127            n_blocks_ahead=load_n_blocks_ahead,
128        )
129        async for block in new_blocks:
130            block_transactions = self.get_all_transactions_of_block(blockchain=blockchain, block=block)
131            async for transaction in block_transactions:
132                yield block, transaction
133
134    async def get_tokens(self, blockchain: str) -> Dict[str, Token]:
135        if self._tokens.get(blockchain) is None:
136            await self._refresh_tokens(blockchain)
137
138        return self._tokens[blockchain]
139
140    async def scale_token_value(self, blockchain: str, contract_address: str, value: Decimal) -> Decimal:
141        tokens = await self.get_tokens(blockchain)
142        token = tokens[contract_address]
143        return value * token.scaling
144
145    async def _acquire(self) -> None:
146        if self._semaphore:
147            await self._semaphore.acquire()
148
149    def _release(self) -> None:
150        if self._semaphore:
151            self._semaphore.release()
152
153    @asynccontextmanager
154    async def _with_semaphore(self) -> AsyncGenerator[None, None]:
155        await self._acquire()
156        try:
157            yield
158        finally:
159            self._release()
160
161    async def _get(self, blockchain: str, endpoint: str) -> dict:  # type: ignore[return]
162        for iteration in count(start=1):
163            try:
164                response = await self._session.get(
165                    f"https://{blockchain}.stocra.com/v1.0/{endpoint}",
166                    raise_for_status=True,
167                    allow_redirects=False,
168                    headers=self.headers,
169                )
170                return cast(dict, await response.json())
171            except (ClientError, asyncio.TimeoutError) as exception:
172                error = StocraHTTPError(endpoint=endpoint, iteration=iteration, exception=exception)
173                if await self._should_continue(error):
174                    continue
175
176                raise
177
178    async def _should_continue(self, error: StocraHTTPError) -> bool:
179        if not self._error_handlers:
180            return False
181
182        for error_handler in self._error_handlers:
183            retry = await cast(Awaitable[bool], error_handler(error))
184            if retry:
185                return True
186
187        return False
188
189    async def _refresh_tokens(self, blockchain: str) -> None:
190        tokens = await self._get(blockchain, "tokens")
191        self._tokens[blockchain] = {contract_address: Token(**token) for contract_address, token in tokens.items()}
class Stocra(stocra.base_client.StocraBase):
 28class Stocra(StocraBase):
 29    _session: ClientSession
 30    _semaphore: Optional[Semaphore]
 31    _error_handlers: List[ErrorHandler]
 32
 33    def __init__(
 34        self,
 35        api_key: Optional[str] = None,
 36        session: Optional[ClientSession] = None,
 37        semaphore: Optional[Semaphore] = None,
 38        error_handlers: Optional[List[ErrorHandler]] = None,
 39    ):
 40        super().__init__(
 41            api_key=api_key,
 42            error_handlers=error_handlers,
 43        )
 44
 45        self._session = session or ClientSession()
 46        self._semaphore = semaphore
 47
 48    async def close(self) -> None:
 49        await self._session.close()
 50
 51    async def get_block(self, blockchain: str, hash_or_height: Union[str, int] = "latest") -> Block:
 52        logger.debug("%s: get_block %s", blockchain, hash_or_height)
 53        async with self._with_semaphore():
 54            block_json = await self._get(blockchain=blockchain, endpoint=f"blocks/{hash_or_height}")
 55            return Block(**block_json)
 56
 57    async def get_transaction(self, blockchain: str, transaction_hash: str) -> Transaction:
 58        logger.debug("%s: get_transaction %s", blockchain, transaction_hash)
 59        async with self._with_semaphore():
 60            transaction_json = await self._get(blockchain=blockchain, endpoint=f"transactions/{transaction_hash}")
 61            return Transaction(**transaction_json)
 62
 63    async def get_all_transactions_of_block(self, blockchain: str, block: Block) -> AsyncIterable[Transaction]:
 64        logger.debug("%s: get_all_transactions %s", blockchain, block.height)
 65        transaction_tasks = []
 66
 67        for transaction_hash in block.transactions:
 68            task = asyncio.create_task(self.get_transaction(blockchain=blockchain, transaction_hash=transaction_hash))
 69            transaction_tasks.append(task)
 70
 71        for completed_task in asyncio.as_completed(transaction_tasks):
 72            yield await completed_task
 73
 74    async def stream_new_blocks(
 75        self,
 76        blockchain: str,
 77        start_block_hash_or_height: Union[int, str] = "latest",
 78        sleep_interval_seconds: float = 10,
 79        n_blocks_ahead: int = 1,
 80    ) -> AsyncIterable[Block]:
 81        if n_blocks_ahead < 1:
 82            raise ValueError(f"`n_blocks_ahead` must be greater than 0. Got `{n_blocks_ahead}`")
 83
 84        block = await self.get_block(blockchain=blockchain, hash_or_height=start_block_hash_or_height)
 85        first_block_to_load_height = block.height + 1
 86        last_block_to_load_height = first_block_to_load_height + n_blocks_ahead + 1
 87        yield block
 88
 89        block_tasks = [
 90            asyncio.create_task(self.get_block(blockchain, height))
 91            for height in range(first_block_to_load_height, last_block_to_load_height)
 92        ]
 93
 94        while True:
 95            block_task = block_tasks.pop(0)
 96            try:
 97                await asyncio.wait_for(block_task, timeout=None)
 98                yield block_task.result()
 99            except ClientResponseError as exception:
100                if exception.status == 404:
101                    logger.debug(
102                        "%s: stream_new_blocks_ahead %s: 404, sleeping for %d seconds",
103                        blockchain,
104                        first_block_to_load_height,
105                        sleep_interval_seconds,
106                    )
107                    await asyncio.sleep(sleep_interval_seconds)
108                    block_tasks.insert(0, asyncio.create_task(self.get_block(blockchain, first_block_to_load_height)))
109                    continue
110
111                raise
112
113            block_tasks.append(asyncio.create_task(self.get_block(blockchain, last_block_to_load_height)))
114            first_block_to_load_height += 1
115            last_block_to_load_height += 1
116
117    async def stream_new_transactions(
118        self,
119        blockchain: str,
120        start_block_hash_or_height: Union[int, str] = "latest",
121        sleep_interval_seconds: float = 10,
122        load_n_blocks_ahead: int = 1,
123    ) -> AsyncIterable[Tuple[Block, Transaction]]:
124        new_blocks = self.stream_new_blocks(
125            blockchain=blockchain,
126            start_block_hash_or_height=start_block_hash_or_height,
127            sleep_interval_seconds=sleep_interval_seconds,
128            n_blocks_ahead=load_n_blocks_ahead,
129        )
130        async for block in new_blocks:
131            block_transactions = self.get_all_transactions_of_block(blockchain=blockchain, block=block)
132            async for transaction in block_transactions:
133                yield block, transaction
134
135    async def get_tokens(self, blockchain: str) -> Dict[str, Token]:
136        if self._tokens.get(blockchain) is None:
137            await self._refresh_tokens(blockchain)
138
139        return self._tokens[blockchain]
140
141    async def scale_token_value(self, blockchain: str, contract_address: str, value: Decimal) -> Decimal:
142        tokens = await self.get_tokens(blockchain)
143        token = tokens[contract_address]
144        return value * token.scaling
145
146    async def _acquire(self) -> None:
147        if self._semaphore:
148            await self._semaphore.acquire()
149
150    def _release(self) -> None:
151        if self._semaphore:
152            self._semaphore.release()
153
154    @asynccontextmanager
155    async def _with_semaphore(self) -> AsyncGenerator[None, None]:
156        await self._acquire()
157        try:
158            yield
159        finally:
160            self._release()
161
162    async def _get(self, blockchain: str, endpoint: str) -> dict:  # type: ignore[return]
163        for iteration in count(start=1):
164            try:
165                response = await self._session.get(
166                    f"https://{blockchain}.stocra.com/v1.0/{endpoint}",
167                    raise_for_status=True,
168                    allow_redirects=False,
169                    headers=self.headers,
170                )
171                return cast(dict, await response.json())
172            except (ClientError, asyncio.TimeoutError) as exception:
173                error = StocraHTTPError(endpoint=endpoint, iteration=iteration, exception=exception)
174                if await self._should_continue(error):
175                    continue
176
177                raise
178
179    async def _should_continue(self, error: StocraHTTPError) -> bool:
180        if not self._error_handlers:
181            return False
182
183        for error_handler in self._error_handlers:
184            retry = await cast(Awaitable[bool], error_handler(error))
185            if retry:
186                return True
187
188        return False
189
190    async def _refresh_tokens(self, blockchain: str) -> None:
191        tokens = await self._get(blockchain, "tokens")
192        self._tokens[blockchain] = {contract_address: Token(**token) for contract_address, token in tokens.items()}

Helper class that provides a standard way to create an ABC using inheritance.

Stocra( api_key: Optional[str] = None, session: Optional[aiohttp.client.ClientSession] = None, semaphore: Optional[asyncio.locks.Semaphore] = None, error_handlers: Optional[List[Callable[[stocra.models.StocraHTTPError], Union[bool, Awaitable[bool]]]]] = None)
33    def __init__(
34        self,
35        api_key: Optional[str] = None,
36        session: Optional[ClientSession] = None,
37        semaphore: Optional[Semaphore] = None,
38        error_handlers: Optional[List[ErrorHandler]] = None,
39    ):
40        super().__init__(
41            api_key=api_key,
42            error_handlers=error_handlers,
43        )
44
45        self._session = session or ClientSession()
46        self._semaphore = semaphore
async def close(self) -> None:
48    async def close(self) -> None:
49        await self._session.close()
async def get_block( self, blockchain: str, hash_or_height: Union[str, int] = 'latest') -> stocra.models.Block:
51    async def get_block(self, blockchain: str, hash_or_height: Union[str, int] = "latest") -> Block:
52        logger.debug("%s: get_block %s", blockchain, hash_or_height)
53        async with self._with_semaphore():
54            block_json = await self._get(blockchain=blockchain, endpoint=f"blocks/{hash_or_height}")
55            return Block(**block_json)
async def get_transaction( self, blockchain: str, transaction_hash: str) -> stocra.models.Transaction:
57    async def get_transaction(self, blockchain: str, transaction_hash: str) -> Transaction:
58        logger.debug("%s: get_transaction %s", blockchain, transaction_hash)
59        async with self._with_semaphore():
60            transaction_json = await self._get(blockchain=blockchain, endpoint=f"transactions/{transaction_hash}")
61            return Transaction(**transaction_json)
async def get_all_transactions_of_block( self, blockchain: str, block: stocra.models.Block) -> AsyncIterable[stocra.models.Transaction]:
63    async def get_all_transactions_of_block(self, blockchain: str, block: Block) -> AsyncIterable[Transaction]:
64        logger.debug("%s: get_all_transactions %s", blockchain, block.height)
65        transaction_tasks = []
66
67        for transaction_hash in block.transactions:
68            task = asyncio.create_task(self.get_transaction(blockchain=blockchain, transaction_hash=transaction_hash))
69            transaction_tasks.append(task)
70
71        for completed_task in asyncio.as_completed(transaction_tasks):
72            yield await completed_task
async def stream_new_blocks( self, blockchain: str, start_block_hash_or_height: Union[int, str] = 'latest', sleep_interval_seconds: float = 10, n_blocks_ahead: int = 1) -> AsyncIterable[stocra.models.Block]:
 74    async def stream_new_blocks(
 75        self,
 76        blockchain: str,
 77        start_block_hash_or_height: Union[int, str] = "latest",
 78        sleep_interval_seconds: float = 10,
 79        n_blocks_ahead: int = 1,
 80    ) -> AsyncIterable[Block]:
 81        if n_blocks_ahead < 1:
 82            raise ValueError(f"`n_blocks_ahead` must be greater than 0. Got `{n_blocks_ahead}`")
 83
 84        block = await self.get_block(blockchain=blockchain, hash_or_height=start_block_hash_or_height)
 85        first_block_to_load_height = block.height + 1
 86        last_block_to_load_height = first_block_to_load_height + n_blocks_ahead + 1
 87        yield block
 88
 89        block_tasks = [
 90            asyncio.create_task(self.get_block(blockchain, height))
 91            for height in range(first_block_to_load_height, last_block_to_load_height)
 92        ]
 93
 94        while True:
 95            block_task = block_tasks.pop(0)
 96            try:
 97                await asyncio.wait_for(block_task, timeout=None)
 98                yield block_task.result()
 99            except ClientResponseError as exception:
100                if exception.status == 404:
101                    logger.debug(
102                        "%s: stream_new_blocks_ahead %s: 404, sleeping for %d seconds",
103                        blockchain,
104                        first_block_to_load_height,
105                        sleep_interval_seconds,
106                    )
107                    await asyncio.sleep(sleep_interval_seconds)
108                    block_tasks.insert(0, asyncio.create_task(self.get_block(blockchain, first_block_to_load_height)))
109                    continue
110
111                raise
112
113            block_tasks.append(asyncio.create_task(self.get_block(blockchain, last_block_to_load_height)))
114            first_block_to_load_height += 1
115            last_block_to_load_height += 1
async def stream_new_transactions( self, blockchain: str, start_block_hash_or_height: Union[int, str] = 'latest', sleep_interval_seconds: float = 10, load_n_blocks_ahead: int = 1) -> AsyncIterable[Tuple[stocra.models.Block, stocra.models.Transaction]]:
117    async def stream_new_transactions(
118        self,
119        blockchain: str,
120        start_block_hash_or_height: Union[int, str] = "latest",
121        sleep_interval_seconds: float = 10,
122        load_n_blocks_ahead: int = 1,
123    ) -> AsyncIterable[Tuple[Block, Transaction]]:
124        new_blocks = self.stream_new_blocks(
125            blockchain=blockchain,
126            start_block_hash_or_height=start_block_hash_or_height,
127            sleep_interval_seconds=sleep_interval_seconds,
128            n_blocks_ahead=load_n_blocks_ahead,
129        )
130        async for block in new_blocks:
131            block_transactions = self.get_all_transactions_of_block(blockchain=blockchain, block=block)
132            async for transaction in block_transactions:
133                yield block, transaction
async def get_tokens(self, blockchain: str) -> Dict[str, stocra.models.Token]:
135    async def get_tokens(self, blockchain: str) -> Dict[str, Token]:
136        if self._tokens.get(blockchain) is None:
137            await self._refresh_tokens(blockchain)
138
139        return self._tokens[blockchain]
async def scale_token_value( self, blockchain: str, contract_address: str, value: decimal.Decimal) -> decimal.Decimal:
141    async def scale_token_value(self, blockchain: str, contract_address: str, value: Decimal) -> Decimal:
142        tokens = await self.get_tokens(blockchain)
143        token = tokens[contract_address]
144        return value * token.scaling