stocra.synchronous.client

  1import logging
  2from concurrent.futures import Executor, as_completed
  3from decimal import Decimal
  4from itertools import count
  5from time import sleep
  6from typing import Dict, Iterable, List, Optional, Tuple, Union, cast
  7
  8from requests import HTTPError, RequestException, Session
  9
 10from stocra.base_client import StocraBase
 11from stocra.models import Block, ErrorHandler, StocraHTTPError, Token, Transaction
 12
 13logger = logging.getLogger("stocra")
 14
 15
 16class Stocra(StocraBase):
 17    _session: Session
 18    _executor: Optional[Executor]
 19
 20    def __init__(
 21        self,
 22        api_key: Optional[str] = None,
 23        session: Optional[Session] = None,
 24        executor: Optional[Executor] = None,
 25        error_handlers: Optional[List[ErrorHandler]] = None,
 26    ):
 27        super().__init__(
 28            api_key=api_key,
 29            error_handlers=error_handlers,
 30        )
 31        self._session = session or Session()
 32        self._executor = executor
 33
 34    def get_block(self, blockchain: str, hash_or_height: Union[str, int] = "latest") -> Block:
 35        logger.debug("%s: get_block %s", blockchain, hash_or_height)
 36        block_json = self._get(blockchain=blockchain, endpoint=f"blocks/{hash_or_height}")
 37        return Block(**block_json)
 38
 39    def get_transaction(self, blockchain: str, transaction_hash: str) -> Transaction:
 40        logger.debug("%s: get_transaction %s", blockchain, transaction_hash)
 41        transaction_json = self._get(blockchain=blockchain, endpoint=f"transactions/{transaction_hash}")
 42        return Transaction(**transaction_json)
 43
 44    def get_all_transactions_of_block(self, blockchain: str, block: Block) -> Iterable[Transaction]:
 45        logger.debug("%s: get_all_transactions %s", blockchain, block.height)
 46        if self._executor:
 47            futures = [
 48                self._executor.submit(self.get_transaction, blockchain, transaction_hash)
 49                for transaction_hash in block.transactions
 50            ]
 51            for transaction_task in as_completed(futures):
 52                yield transaction_task.result()
 53        else:
 54            for transaction_hash in block.transactions:
 55                yield self.get_transaction(blockchain, transaction_hash)
 56
 57    def stream_new_blocks(
 58        self,
 59        blockchain: str,
 60        start_block_hash_or_height: Union[int, str] = "latest",
 61        sleep_interval_seconds: float = 10,
 62    ) -> Iterable[Block]:
 63        block = self.get_block(blockchain=blockchain, hash_or_height=start_block_hash_or_height)
 64        next_block_height = block.height + 1
 65        yield block
 66
 67        while True:
 68            try:
 69                block = self.get_block(blockchain=blockchain, hash_or_height=next_block_height)
 70            except HTTPError as exception:
 71                if exception.response.status_code == 404:
 72                    self._handle_404_during_block_streaming(blockchain, next_block_height, sleep_interval_seconds)
 73                    continue
 74
 75                raise
 76
 77            next_block_height += 1
 78            yield block
 79
 80    def stream_new_blocks_ahead(
 81        self,
 82        blockchain: str,
 83        start_block_hash_or_height: Union[int, str] = "latest",
 84        sleep_interval_seconds: float = 10,
 85        n_blocks_ahead: int = 10,
 86    ) -> Iterable[Block]:
 87        if not self._executor:
 88            raise Exception("Works only with executor")
 89
 90        if n_blocks_ahead < 1:
 91            raise ValueError(f"`n_blocks_ahead` must be greater than 0. Got `{n_blocks_ahead}`")
 92
 93        block = self.get_block(blockchain=blockchain, hash_or_height=start_block_hash_or_height)
 94        next_block_height = block.height + 1
 95        last_block_height = next_block_height + n_blocks_ahead + 1
 96        yield block
 97
 98        block_tasks = [
 99            self._executor.submit(self.get_block, blockchain, height)
100            for height in range(next_block_height, last_block_height)
101        ]
102
103        while True:
104            block_task = block_tasks.pop(0)
105            try:
106                yield block_task.result()
107            except HTTPError as exception:
108                if exception.response.status_code == 404:
109                    self._handle_404_during_block_streaming(blockchain, next_block_height, sleep_interval_seconds)
110                    block_tasks.insert(0, self._executor.submit(self.get_block, blockchain, next_block_height))
111                    continue
112
113                raise
114
115            block_tasks.append(self._executor.submit(self.get_block, blockchain, last_block_height))
116            next_block_height += 1
117            last_block_height += 1
118
119    def stream_new_transactions(
120        self,
121        blockchain: str,
122        start_block_hash_or_height: Union[int, str] = "latest",
123        sleep_interval_seconds: float = 10,
124        load_n_blocks_ahead: Optional[int] = None,
125    ) -> Iterable[Tuple[Block, Transaction]]:
126        if load_n_blocks_ahead:
127            new_blocks = self.stream_new_blocks_ahead(
128                blockchain=blockchain,
129                start_block_hash_or_height=start_block_hash_or_height,
130                sleep_interval_seconds=sleep_interval_seconds,
131                n_blocks_ahead=load_n_blocks_ahead,
132            )
133        else:
134            new_blocks = self.stream_new_blocks(
135                blockchain=blockchain,
136                start_block_hash_or_height=start_block_hash_or_height,
137                sleep_interval_seconds=sleep_interval_seconds,
138            )
139
140        for block in new_blocks:
141            block_transactions = self.get_all_transactions_of_block(blockchain=blockchain, block=block)
142            for transaction in block_transactions:
143                yield block, transaction
144
145    def get_tokens(self, blockchain: str) -> Dict[str, Token]:
146        if self._tokens.get(blockchain) is None:
147            self._refresh_tokens(blockchain)
148
149        return self._tokens[blockchain]
150
151    def scale_token_value(self, blockchain: str, contract_address: str, value: Decimal) -> Decimal:
152        token = self.get_tokens(blockchain)[contract_address]
153        return value * token.scaling
154
155    def _get(self, blockchain: str, endpoint: str) -> dict:  # type: ignore[return]
156        for iteration in count(start=1):
157            try:
158                response = self._session.get(
159                    f"https://{blockchain}.stocra.com/v1.0/{endpoint}",
160                    allow_redirects=False,
161                    headers=self.headers,
162                )
163                response.raise_for_status()
164                return cast(dict, response.json())
165            except RequestException as exception:
166                error = StocraHTTPError(endpoint=endpoint, iteration=iteration, exception=exception)
167                if self._should_continue(error):
168                    continue
169
170                raise
171
172    def _should_continue(self, error: StocraHTTPError) -> bool:
173        if not self._error_handlers:
174            return False
175
176        for error_handler in self._error_handlers:
177            retry = error_handler(error)
178            if retry:
179                return True
180
181        return False
182
183    @classmethod
184    def _handle_404_during_block_streaming(
185        cls, blockchain: str, block_height: int, sleep_interval_seconds: float
186    ) -> None:
187        logger.debug(
188            "%s: stream_new_blocks %s: 404, sleeping for %d seconds",
189            blockchain,
190            block_height,
191            sleep_interval_seconds,
192        )
193        sleep(sleep_interval_seconds)
194
195    def _refresh_tokens(self, blockchain: str) -> None:
196        tokens = self._get(blockchain, "tokens")
197        self._tokens[blockchain] = {contract_address: Token(**token) for contract_address, token in tokens.items()}
class Stocra(stocra.base_client.StocraBase):
 17class Stocra(StocraBase):
 18    _session: Session
 19    _executor: Optional[Executor]
 20
 21    def __init__(
 22        self,
 23        api_key: Optional[str] = None,
 24        session: Optional[Session] = None,
 25        executor: Optional[Executor] = None,
 26        error_handlers: Optional[List[ErrorHandler]] = None,
 27    ):
 28        super().__init__(
 29            api_key=api_key,
 30            error_handlers=error_handlers,
 31        )
 32        self._session = session or Session()
 33        self._executor = executor
 34
 35    def get_block(self, blockchain: str, hash_or_height: Union[str, int] = "latest") -> Block:
 36        logger.debug("%s: get_block %s", blockchain, hash_or_height)
 37        block_json = self._get(blockchain=blockchain, endpoint=f"blocks/{hash_or_height}")
 38        return Block(**block_json)
 39
 40    def get_transaction(self, blockchain: str, transaction_hash: str) -> Transaction:
 41        logger.debug("%s: get_transaction %s", blockchain, transaction_hash)
 42        transaction_json = self._get(blockchain=blockchain, endpoint=f"transactions/{transaction_hash}")
 43        return Transaction(**transaction_json)
 44
 45    def get_all_transactions_of_block(self, blockchain: str, block: Block) -> Iterable[Transaction]:
 46        logger.debug("%s: get_all_transactions %s", blockchain, block.height)
 47        if self._executor:
 48            futures = [
 49                self._executor.submit(self.get_transaction, blockchain, transaction_hash)
 50                for transaction_hash in block.transactions
 51            ]
 52            for transaction_task in as_completed(futures):
 53                yield transaction_task.result()
 54        else:
 55            for transaction_hash in block.transactions:
 56                yield self.get_transaction(blockchain, transaction_hash)
 57
 58    def stream_new_blocks(
 59        self,
 60        blockchain: str,
 61        start_block_hash_or_height: Union[int, str] = "latest",
 62        sleep_interval_seconds: float = 10,
 63    ) -> Iterable[Block]:
 64        block = self.get_block(blockchain=blockchain, hash_or_height=start_block_hash_or_height)
 65        next_block_height = block.height + 1
 66        yield block
 67
 68        while True:
 69            try:
 70                block = self.get_block(blockchain=blockchain, hash_or_height=next_block_height)
 71            except HTTPError as exception:
 72                if exception.response.status_code == 404:
 73                    self._handle_404_during_block_streaming(blockchain, next_block_height, sleep_interval_seconds)
 74                    continue
 75
 76                raise
 77
 78            next_block_height += 1
 79            yield block
 80
 81    def stream_new_blocks_ahead(
 82        self,
 83        blockchain: str,
 84        start_block_hash_or_height: Union[int, str] = "latest",
 85        sleep_interval_seconds: float = 10,
 86        n_blocks_ahead: int = 10,
 87    ) -> Iterable[Block]:
 88        if not self._executor:
 89            raise Exception("Works only with executor")
 90
 91        if n_blocks_ahead < 1:
 92            raise ValueError(f"`n_blocks_ahead` must be greater than 0. Got `{n_blocks_ahead}`")
 93
 94        block = self.get_block(blockchain=blockchain, hash_or_height=start_block_hash_or_height)
 95        next_block_height = block.height + 1
 96        last_block_height = next_block_height + n_blocks_ahead + 1
 97        yield block
 98
 99        block_tasks = [
100            self._executor.submit(self.get_block, blockchain, height)
101            for height in range(next_block_height, last_block_height)
102        ]
103
104        while True:
105            block_task = block_tasks.pop(0)
106            try:
107                yield block_task.result()
108            except HTTPError as exception:
109                if exception.response.status_code == 404:
110                    self._handle_404_during_block_streaming(blockchain, next_block_height, sleep_interval_seconds)
111                    block_tasks.insert(0, self._executor.submit(self.get_block, blockchain, next_block_height))
112                    continue
113
114                raise
115
116            block_tasks.append(self._executor.submit(self.get_block, blockchain, last_block_height))
117            next_block_height += 1
118            last_block_height += 1
119
120    def stream_new_transactions(
121        self,
122        blockchain: str,
123        start_block_hash_or_height: Union[int, str] = "latest",
124        sleep_interval_seconds: float = 10,
125        load_n_blocks_ahead: Optional[int] = None,
126    ) -> Iterable[Tuple[Block, Transaction]]:
127        if load_n_blocks_ahead:
128            new_blocks = self.stream_new_blocks_ahead(
129                blockchain=blockchain,
130                start_block_hash_or_height=start_block_hash_or_height,
131                sleep_interval_seconds=sleep_interval_seconds,
132                n_blocks_ahead=load_n_blocks_ahead,
133            )
134        else:
135            new_blocks = self.stream_new_blocks(
136                blockchain=blockchain,
137                start_block_hash_or_height=start_block_hash_or_height,
138                sleep_interval_seconds=sleep_interval_seconds,
139            )
140
141        for block in new_blocks:
142            block_transactions = self.get_all_transactions_of_block(blockchain=blockchain, block=block)
143            for transaction in block_transactions:
144                yield block, transaction
145
146    def get_tokens(self, blockchain: str) -> Dict[str, Token]:
147        if self._tokens.get(blockchain) is None:
148            self._refresh_tokens(blockchain)
149
150        return self._tokens[blockchain]
151
152    def scale_token_value(self, blockchain: str, contract_address: str, value: Decimal) -> Decimal:
153        token = self.get_tokens(blockchain)[contract_address]
154        return value * token.scaling
155
156    def _get(self, blockchain: str, endpoint: str) -> dict:  # type: ignore[return]
157        for iteration in count(start=1):
158            try:
159                response = self._session.get(
160                    f"https://{blockchain}.stocra.com/v1.0/{endpoint}",
161                    allow_redirects=False,
162                    headers=self.headers,
163                )
164                response.raise_for_status()
165                return cast(dict, response.json())
166            except RequestException as exception:
167                error = StocraHTTPError(endpoint=endpoint, iteration=iteration, exception=exception)
168                if self._should_continue(error):
169                    continue
170
171                raise
172
173    def _should_continue(self, error: StocraHTTPError) -> bool:
174        if not self._error_handlers:
175            return False
176
177        for error_handler in self._error_handlers:
178            retry = error_handler(error)
179            if retry:
180                return True
181
182        return False
183
184    @classmethod
185    def _handle_404_during_block_streaming(
186        cls, blockchain: str, block_height: int, sleep_interval_seconds: float
187    ) -> None:
188        logger.debug(
189            "%s: stream_new_blocks %s: 404, sleeping for %d seconds",
190            blockchain,
191            block_height,
192            sleep_interval_seconds,
193        )
194        sleep(sleep_interval_seconds)
195
196    def _refresh_tokens(self, blockchain: str) -> None:
197        tokens = self._get(blockchain, "tokens")
198        self._tokens[blockchain] = {contract_address: Token(**token) for contract_address, token in tokens.items()}

Helper class that provides a standard way to create an ABC using inheritance.

Stocra( api_key: Optional[str] = None, session: Optional[requests.sessions.Session] = None, executor: Optional[concurrent.futures._base.Executor] = None, error_handlers: Optional[List[Callable[[stocra.models.StocraHTTPError], Union[bool, Awaitable[bool]]]]] = None)
21    def __init__(
22        self,
23        api_key: Optional[str] = None,
24        session: Optional[Session] = None,
25        executor: Optional[Executor] = None,
26        error_handlers: Optional[List[ErrorHandler]] = None,
27    ):
28        super().__init__(
29            api_key=api_key,
30            error_handlers=error_handlers,
31        )
32        self._session = session or Session()
33        self._executor = executor
def get_block( self, blockchain: str, hash_or_height: Union[str, int] = 'latest') -> stocra.models.Block:
35    def get_block(self, blockchain: str, hash_or_height: Union[str, int] = "latest") -> Block:
36        logger.debug("%s: get_block %s", blockchain, hash_or_height)
37        block_json = self._get(blockchain=blockchain, endpoint=f"blocks/{hash_or_height}")
38        return Block(**block_json)
def get_transaction( self, blockchain: str, transaction_hash: str) -> stocra.models.Transaction:
40    def get_transaction(self, blockchain: str, transaction_hash: str) -> Transaction:
41        logger.debug("%s: get_transaction %s", blockchain, transaction_hash)
42        transaction_json = self._get(blockchain=blockchain, endpoint=f"transactions/{transaction_hash}")
43        return Transaction(**transaction_json)
def get_all_transactions_of_block( self, blockchain: str, block: stocra.models.Block) -> Iterable[stocra.models.Transaction]:
45    def get_all_transactions_of_block(self, blockchain: str, block: Block) -> Iterable[Transaction]:
46        logger.debug("%s: get_all_transactions %s", blockchain, block.height)
47        if self._executor:
48            futures = [
49                self._executor.submit(self.get_transaction, blockchain, transaction_hash)
50                for transaction_hash in block.transactions
51            ]
52            for transaction_task in as_completed(futures):
53                yield transaction_task.result()
54        else:
55            for transaction_hash in block.transactions:
56                yield self.get_transaction(blockchain, transaction_hash)
def stream_new_blocks( self, blockchain: str, start_block_hash_or_height: Union[int, str] = 'latest', sleep_interval_seconds: float = 10) -> Iterable[stocra.models.Block]:
58    def stream_new_blocks(
59        self,
60        blockchain: str,
61        start_block_hash_or_height: Union[int, str] = "latest",
62        sleep_interval_seconds: float = 10,
63    ) -> Iterable[Block]:
64        block = self.get_block(blockchain=blockchain, hash_or_height=start_block_hash_or_height)
65        next_block_height = block.height + 1
66        yield block
67
68        while True:
69            try:
70                block = self.get_block(blockchain=blockchain, hash_or_height=next_block_height)
71            except HTTPError as exception:
72                if exception.response.status_code == 404:
73                    self._handle_404_during_block_streaming(blockchain, next_block_height, sleep_interval_seconds)
74                    continue
75
76                raise
77
78            next_block_height += 1
79            yield block
def stream_new_blocks_ahead( self, blockchain: str, start_block_hash_or_height: Union[int, str] = 'latest', sleep_interval_seconds: float = 10, n_blocks_ahead: int = 10) -> Iterable[stocra.models.Block]:
 81    def stream_new_blocks_ahead(
 82        self,
 83        blockchain: str,
 84        start_block_hash_or_height: Union[int, str] = "latest",
 85        sleep_interval_seconds: float = 10,
 86        n_blocks_ahead: int = 10,
 87    ) -> Iterable[Block]:
 88        if not self._executor:
 89            raise Exception("Works only with executor")
 90
 91        if n_blocks_ahead < 1:
 92            raise ValueError(f"`n_blocks_ahead` must be greater than 0. Got `{n_blocks_ahead}`")
 93
 94        block = self.get_block(blockchain=blockchain, hash_or_height=start_block_hash_or_height)
 95        next_block_height = block.height + 1
 96        last_block_height = next_block_height + n_blocks_ahead + 1
 97        yield block
 98
 99        block_tasks = [
100            self._executor.submit(self.get_block, blockchain, height)
101            for height in range(next_block_height, last_block_height)
102        ]
103
104        while True:
105            block_task = block_tasks.pop(0)
106            try:
107                yield block_task.result()
108            except HTTPError as exception:
109                if exception.response.status_code == 404:
110                    self._handle_404_during_block_streaming(blockchain, next_block_height, sleep_interval_seconds)
111                    block_tasks.insert(0, self._executor.submit(self.get_block, blockchain, next_block_height))
112                    continue
113
114                raise
115
116            block_tasks.append(self._executor.submit(self.get_block, blockchain, last_block_height))
117            next_block_height += 1
118            last_block_height += 1
def stream_new_transactions( self, blockchain: str, start_block_hash_or_height: Union[int, str] = 'latest', sleep_interval_seconds: float = 10, load_n_blocks_ahead: Optional[int] = None) -> Iterable[Tuple[stocra.models.Block, stocra.models.Transaction]]:
120    def stream_new_transactions(
121        self,
122        blockchain: str,
123        start_block_hash_or_height: Union[int, str] = "latest",
124        sleep_interval_seconds: float = 10,
125        load_n_blocks_ahead: Optional[int] = None,
126    ) -> Iterable[Tuple[Block, Transaction]]:
127        if load_n_blocks_ahead:
128            new_blocks = self.stream_new_blocks_ahead(
129                blockchain=blockchain,
130                start_block_hash_or_height=start_block_hash_or_height,
131                sleep_interval_seconds=sleep_interval_seconds,
132                n_blocks_ahead=load_n_blocks_ahead,
133            )
134        else:
135            new_blocks = self.stream_new_blocks(
136                blockchain=blockchain,
137                start_block_hash_or_height=start_block_hash_or_height,
138                sleep_interval_seconds=sleep_interval_seconds,
139            )
140
141        for block in new_blocks:
142            block_transactions = self.get_all_transactions_of_block(blockchain=blockchain, block=block)
143            for transaction in block_transactions:
144                yield block, transaction
def get_tokens(self, blockchain: str) -> Dict[str, stocra.models.Token]:
146    def get_tokens(self, blockchain: str) -> Dict[str, Token]:
147        if self._tokens.get(blockchain) is None:
148            self._refresh_tokens(blockchain)
149
150        return self._tokens[blockchain]
def scale_token_value( self, blockchain: str, contract_address: str, value: decimal.Decimal) -> decimal.Decimal:
152    def scale_token_value(self, blockchain: str, contract_address: str, value: Decimal) -> Decimal:
153        token = self.get_tokens(blockchain)[contract_address]
154        return value * token.scaling