Source code for scratchattach.eventhandlers.cloud_requests

"""CloudRequests class (threading.Event version)"""
from __future__ import annotations

from .cloud_events import CloudEvents
from scratchattach.site import project
from threading import Thread, Event, current_thread
import time
import random
import traceback
from scratchattach.utils.encoder import Encoding
from scratchattach.utils import exceptions

[docs] class Request: """ Saves a request added to the request handler """
[docs] def __init__(self, request_name, *, on_call, cloud_requests, thread=True, enabled=True, response_priority=0, debug=False): self.name = request_name self.on_call = on_call self.thread = thread self.enabled = enabled self.response_priority = response_priority self.cloud_requests = cloud_requests # the corresponding CloudRequests object self.debug = debug or self.cloud_requests.debug
def __call__(self, received_request): if not self.enabled: self.cloud_requests.call_event("on_disabled_request", [received_request]) try: current_thread().setName(received_request.request_id) output = self.on_call(*received_request.arguments) self.cloud_requests.request_outputs.append({"receive":received_request.timestamp, "request_id":received_request.request_id, "output":output, "priority":self.response_priority}) except Exception as e: self.cloud_requests.call_event("on_error", [received_request, e]) if self.cloud_requests.ignore_exceptions: print( f"Warning: Caught error in request '{self.name}' - Full error below" ) try: traceback.print_exc() except Exception: print(e) else: print(f"Exception in request '{self.name}':") raise(e) if self.debug: traceback_full = traceback.format_exc().splitlines() output = [f"Error in request {self.name}", "Traceback: "] output.extend(traceback_full) self.cloud_requests.request_outputs.append({"receive":received_request.timestamp, "request_id":received_request.request_id, "output":output, "priority":self.response_priority}) else: self.cloud_requests.request_outputs.append({"receive":received_request.timestamp, "request_id":received_request.request_id, "output":[f"Error in request {self.name}","Check the Python console"], "priority":self.response_priority}) self.cloud_requests.responder_event.set() # Activate the .cloud_requests._responder process so it sends back the data to Scratch
[docs] class ReceivedRequest:
[docs] def __init__(self, **entries): self.__dict__.update(entries)
[docs] class CloudRequests(CloudEvents): # The CloudRequests class is built upon CloudEvents, similar to how Filterbot is built upon MessageEvents
[docs] def __init__(self, cloud, used_cloud_vars=["1", "2", "3", "4", "5", "6", "7", "8", "9"], no_packet_loss=False, respond_order="receive", debug=False): super().__init__(cloud) # Setup self._requests = {} self.event(self.on_set, thread=False) self.event(self.on_reconnect, thread=True) self.no_packet_loss = no_packet_loss # When enabled, query the clouddata log regularly for missed requests and reconnect after every single request (reduces packet loss a lot, but is spammy and can make response duration longer) self.used_cloud_vars = used_cloud_vars self.respond_order = respond_order self.debug = debug # Lists and dicts for saving request-related stuff self.request_parts = {} # Dict (key: request_id) for saving the parts of the requests not fully received yet self.received_requests = [] # List saving the requests that have been fully received, but not executed yet (as ReceivedRequest objects). Requests that run in threads will never be put into this list, but are executed directly. self.executed_requests = {} # Dict (key: request_id) saving the request that are currently being executed and have not been responded yet (as ReceivedRequest objects) self.request_outputs = [] # List for the output data returned by the requests (so the thread sending it back to Scratch can access it) self.responded_request_ids = [] # Saves the last 15 request ids that have been responded to. This prevents double responses then using the clouddata logs as 2nd source for preventing packet loss self.packet_memory = [] # Saves the last 15 responses so the Scratch project can re-request packets that weren't received self._packets_to_resend = [] # threading Event objects used to block threads until they are needed (lower CPU usage compared to a busy-sleep event queue)# self.executer_event = Event() self.responder_event = Event() # Start ._executer and ._responder threads (these threads are remain blocked until cloud activity is received and don't consume any CPU) self.executer_thread = Thread(target=self._executer) self.responder_thread = Thread(target=self._responder) self.executer_thread.start() self.responder_thread.start() self.current_var = 0 # ID of the last set FROM_HOST_ variable (when a response is sent back to Scratch, these are set cyclically) self.credit_check() self.hard_stop = False # When set to True, all processes will halt immediately without finishing safely (can result in not fully received / responded requests etc.)
# -- Adding and removing requests --
[docs] def request(self, function=None, *, enabled=True, name=None, thread=True, response_priority=0, debug=False): """ Decorator function. Adds a request to the request handler. """ def inner(function): # called if the decorator provides arguments self._requests[function.__name__ if name is None else name] = Request( function.__name__ if name is None else name, enabled = enabled, thread=thread, response_priority=response_priority, on_call=function, cloud_requests=self, debug=debug ) if function is None: # => the decorator provides arguments return inner else: # => the decorator doesn't provide arguments inner(function)
[docs] def add_request(self, function, *, enabled=True, name=None): self.request(enabled=enabled, name=name)(function)
[docs] def remove_request(self, name): try: self._requests.pop(name) except Exception: raise ValueError( f"No request with name {name} found to remove" )
# -- Parse and send back the request output --
[docs] def _parse_output(self, request_name, output, request_id): """ Prepares the transmission of the request output to the Scratch project """ if len(str(output)) > 3000: print( f"Warning: Output of request '{request_name}' is longer than 3000 characters (length: {len(str(output))} characters). Responding the request will take >4 seconds." ) if str(request_id).endswith("0"): try: int(output) == output except Exception: send_as_integer = False else: send_as_integer = not ("-" in str(output)) and not isinstance(output, bool) else: send_as_integer = False if output is None: print(f"Warning: Request '{request_name}' didn't return anything.") return elif send_as_integer: output = str(output) elif not isinstance(output, list): if output == "": output = "-" output = Encoding.encode(output) else: input = output output = "" for i in input: output += Encoding.encode(i) output += "89" self._respond(request_id, output, validation=3222 if send_as_integer else 2222)
[docs] def _set_FROM_HOST_var(self, value): try: self.cloud.set_var(f"FROM_HOST_{self.used_cloud_vars[self.current_var]}", value) except exceptions.CloudConnectionError: self.call_event("on_disconnect") except Exception as e: print("scratchattach: internal error while responding (please submit a bug report on GitHub):", e) self.current_var += 1 if self.current_var == len(self.used_cloud_vars): self.current_var = 0 time.sleep(self.cloud.ws_shortterm_ratelimit)
[docs] def _respond(self, request_id, response, *, validation=2222): """ Sends back the request response to the Scratch project """ if (self.cloud.last_var_set + 8 < time.time() # if the cloud connection has been idle for too long, a reconnect is necessary to make sure the first package will not be lost ) or self.no_packet_loss: self.cloud.reconnect() memory = {"rid":request_id} remaining_response = str(response) length_limit = self.cloud.length_limit - (len(str(request_id))+6) # the subtrahend is the worst-case length of the "."+numbers after the "." i = 0 while not remaining_response == "": if len(remaining_response) > length_limit: response_part = remaining_response[:length_limit] remaining_response = remaining_response[length_limit:] i += 1 if i > 99: iteration_string = str(i) elif i > 9: iteration_string = "0" + str(i) else: iteration_string = "00" + str(i) value_to_send = f"{response_part}.{request_id}{iteration_string}1" memory[i] = value_to_send self._set_FROM_HOST_var(value_to_send) else: self._set_FROM_HOST_var(f"{remaining_response}.{request_id}{validation}") self.packet_memory.append(memory) if len(self.packet_memory) > 15: self.packet_memory.pop(0) remaining_response = "" if self.hard_stop: # stop immediately without exiting safely break
[docs] def _request_packet_from_memory(self, request_id, packet_id): memory = list(filter(lambda x : x["rid"] == request_id, self.packet_memory)) if len(memory) > 0: self._packets_to_resend.append(memory[0][int(packet_id)]) self.responder_event.set() # activate _responder process
# -- Register and handle incoming requests --
[docs] def on_set(self, activity): """ This function is automatically called on cloud activites by the underlying cloud events that this CloudRequests class inherits from It registers incoming cloud activity and (if request.thread is True) runs them directly or (else) adds detected request to the .received_requests list """ # Note for contributors: All functions called in this on_set function MUST be executed in threads because this function blocks the cloud events receiver, which is usually not a problem (because of the websocket buffer) but can cause problems in rare cases if activity.var == "TO_HOST" and "." in activity.value: # Parsing the received request raw_request, request_id = activity.value.split(".") if len(request_id) == 8 and request_id[-1] == "9": # A lost packet was re-requested self._request_packet_from_memory(request_id[1:], int(raw_request)) return if request_id in self.responded_request_ids: # => The received request has already been answered, meaning this activity has already been received return if activity.value[0] == "-": # => The received request is actually part of a bigger request if not request_id in self.request_parts: self.request_parts[request_id] = [] self.request_parts[request_id].append(raw_request[1:]) return self.responded_request_ids.insert(0, request_id) self.responded_request_ids = self.responded_request_ids[:35] # If the request consists of multiple parts: Put together the parts to get the whole raw request string _raw_request = "" if request_id in self.request_parts: data = self.request_parts[request_id] for i in data: _raw_request += i self.request_parts.pop(request_id) raw_request = _raw_request + raw_request # Decode request and parse arguemtns: request = Encoding.decode(raw_request) arguments = request.split("&") request_name = arguments.pop(0)# # Check if the request is unknown: if request_name not in self._requests: print( f"Warning: Client received an unknown request called '{request_name}'" ) self.call_event("on_unknown_request", [ ReceivedRequest(request_name=request, requester=activity.user, timestamp=activity.timestamp, arguments=arguments, request_id=request_id, activity=activity ) ]) return received_request = ReceivedRequest( request = self._requests[request_name], requester=activity.user, timestamp=activity.timestamp, arguments=arguments, request_id=request_id, activity=activity ) self.call_event("on_request", [received_request]) if received_request.request.thread: self.executed_requests[request_id] = received_request Thread(target=received_request.request, args=[received_request]).start() # Execute the request function directly in a thread else: self.received_requests.append(received_request) self.executer_event.set() # Activate the ._executer process so that it handles the received request
[docs] def _executer(self): """ A process that detects new requests in .received_requests, moves them to .executed_requests and executes them. Only requests not running in threads are handled in this process. """ # If .no_packet_loss is enabled and the cloud provides logs, the logs are used to check whether there are cloud activities that were not received over the cloud connection used by the underlying cloud events use_extra_data = (self.no_packet_loss and hasattr(self.cloud, "logs")) self.executer_event.wait() # Wait for requests to be received while self.executer_thread is not None: # If self.executer_thread is None, it means cloud requests were stopped using .stop() self.executer_event.clear() if self.received_requests == [] and use_extra_data: Thread(target=self.on_reconnect).start() while self.received_requests != []: received_request = self.received_requests.pop(0) self.executed_requests[received_request.request_id] = received_request received_request.request(received_request) # Execute the request function if use_extra_data: Thread(target=self.on_reconnect).start() if self.hard_stop: # stop immediately without exiting safely break self.executer_event.wait(timeout = 2.5 if use_extra_data else None) # Wait for requests to be received
[docs] def _responder(self): """ A process that detects incoming request outputs in .request_outputs and handles them by sending them back to the Scratch project, also removes the corresponding ReceivedRequest object from .executed_requests """ while self.responder_thread is not None: # If self.responder_thread is None, it means cloud requests were stopped using .stop() self.responder_event.wait() # Wait for executed requests to respond self.responder_event.clear() while self._packets_to_resend != []: self._set_FROM_HOST_var(self._packets_to_resend.pop(0)) if self.hard_stop: # stop immediately without exiting safely break while self.request_outputs != []: if self.respond_order == "finish": output_obj = self.request_outputs.pop(0) else: output_obj = min(self.request_outputs, key=lambda x : x[self.respond_order]) self.request_outputs.remove(output_obj) if output_obj["request_id"] in self.executed_requests: received_request = self.executed_requests.pop(output_obj["request_id"]) self._parse_output(received_request, output_obj["output"], output_obj["request_id"]) else: self._parse_output("[sent from backend]", output_obj["output"], output_obj["request_id"]) if self.hard_stop: # stop immediately without exiting safely break
[docs] def on_reconnect(self): """ Called when the underlying cloud events reconnect. Makes sure that no requests are missed in this case. """ try: extradata = self.cloud.logs(limit=35)[::-1] # Reverse result so oldest activity is first for activity in extradata: if activity.timestamp < self.startup_time: continue self.on_set(activity) # Read in the fetched activity except Exception: pass
# -- Functions to be used in requests to get info about the request --
[docs] def get_requester(self): """ Can be used inside a request to get the username that performed the request. """ activity = self.executed_requests[current_thread().name].activity if activity.user is None: activity.load_log_data() return activity.user
[docs] def get_timestamp(self): """ Can be used inside a request to get the timestamp of when the request was received. """ activity = self.executed_requests[current_thread().name].activity return activity.timestamp
[docs] def get_exact_timestamp(self): """ Can be used inside a request to get the exact timestamp of when the request was performed. """ activity = self.executed_requests[current_thread().name].activity activity.load_log_data() return activity.timestamp
# -- Other stuff --
[docs] def send(self, data, *, priority=0): """ Send data to the Scratch project without a priorly received request. The Scratch project will only receive the data if it's running. """ self.request_outputs.append({"receive":time.time()*1000, "request_id":"100000000"+str(random.randint(1000, 9999)), "output":data, "priority":priority}) self.responder_event.set() # activate _responder process # Prevent user from breaking cloud requests by sending too fast (automatically increase wait time if the server can't keep up): if len(self.request_outputs) > 20: time.sleep(0.5) if len(self.request_outputs) > 15: time.sleep(0.2) if len(self.request_outputs) > 10: time.sleep(0.13) elif len(self.request_outputs) > 3: time.sleep(0.1) else: time.sleep(0.07)
[docs] def stop(self): """ Stops the request handler and all associated threads forever. Lets running response sending processes finish. """ # Override the .stop function from BaseEventHandler to make sure the ._executer and ._responder threads are also terminated super().stop() self.executer_thread = None self.responder_thread = None self.executer_event.set() self.responder_event.set()
[docs] def hard_stop(self): """ Stops the request handler and all associated threads forever. Stops running response sending processes immediately. """ self.hard_stop = True self.stop() time.sleep(0.5) self.hard_stop = False
[docs] def credit_check(self): try: p = project.Project(id=self.cloud.project_id) if not p.update(): # can't get project, probably because it's unshared (no authentication is used for getting it) print("If you use cloud requests or cloud storages, please credit TimMcCool!") return description = (str(p.instructions) + str(p.notes)).lower() if not ("timmccool" in description or "timmcool" in description or "timccool" in description or "timcool" in description): print("It was detected that no credit was given in the project description! Please credit TimMcCool when using CloudRequests.") else: print("Thanks for giving credit for CloudRequests!") except Exception: print("If you use CloudRequests, please credit TimMcCool!")
[docs] def run(self): # Was changed to .start(), but .run() is kept for backwards compatibility print("Warning: requests.run() was changed to requests.start() in v2.0. .run() will be removed in a future version") self.start()