Source code for scratchattach.other.project_json_capabilities

"""Project JSON reading and editing capabilities.
This code is still in BETA, there are still bugs and potential consistency issues to be fixed. New features will be added."""

# Note: You may want to make this into multiple files for better organisation
from __future__ import annotations

import hashlib
import json
import random
import string
import zipfile
from abc import ABC, abstractmethod
from scratchattach.utils import exceptions
from scratchattach.utils.commons import empty_project_json
from scratchattach.utils.requests import requests
# noinspection PyPep8Naming
[docs] def load_components(json_data: list, ComponentClass: type, target_list: list): for element in json_data: component = ComponentClass() component.from_json(element) target_list.append(component)
[docs] class ProjectBody:
[docs] class BaseProjectBodyComponent(ABC):
[docs] def __init__(self, **entries): # Attributes every object needs to have: self.id = None # Update attributes from entries dict: self.__dict__.update(entries)
[docs] @abstractmethod def from_json(self, data: dict): pass
[docs] @abstractmethod def to_json(self): pass
[docs] def _generate_new_id(self): """ Generates a new id and updates the id. Warning: When done on Block objects, the next_id attribute of the parent block and the parent_id attribute of the next block will NOT be updated by this method. """ self.id = ''.join(random.choices(string.ascii_letters + string.digits, k=20))
[docs] class Block(BaseProjectBodyComponent): # Thanks to @MonkeyBean2 for some scripts
[docs] def from_json(self, data: dict): self.opcode = data["opcode"] # The name of the block self.next_id = data.get("next", None) # The id of the block attached below this block self.parent_id = data.get("parent", None) # The id of the block that this block is attached to self.input_data = data.get("inputs", None) # The blocks inside of the block (if the block is a loop or an if clause for example) self.fields = data.get("fields", None) # The values inside the block's inputs self.shadow = data.get("shadow", False) # Whether the block is displayed with a shadow self.topLevel = data.get("topLevel", False) # Whether the block has no parent self.mutation = data.get("mutation", None) # For custom blocks self.x = data.get("x", None) # x position if topLevel self.y = data.get("y", None) # y position if topLevel
[docs] def to_json(self): output = {"opcode": self.opcode, "next": self.next_id, "parent": self.parent_id, "inputs": self.input_data, "fields": self.fields, "shadow": self.shadow, "topLevel": self.topLevel, "mutation": self.mutation, "x": self.x, "y": self.y} return {k: v for k, v in output.items() if v}
[docs] def attached_block(self): return self.sprite.block_by_id(self.next_id)
[docs] def previous_block(self): return self.sprite.block_by_id(self.parent_id)
[docs] def top_level_block(self): block = self return block
[docs] def previous_chain(self): # to implement: a method that detects circular block chains (to make sure this method terminates) chain = [] block = self while block.parent_id is not None: block = block.previous_block() chain.insert(0, block) return chain
[docs] def attached_chain(self): chain = [] block = self while block.next_id is not None: block = block.attached_block() chain.append(block) return chain
[docs] def complete_chain(self): return self.previous_chain() + [self] + self.attached_chain()
[docs] def duplicate_single_block(self): new_block = ProjectBody.Block(**self.__dict__) new_block.parent_id = None new_block.next_id = None new_block._generate_new_id() self.sprite.blocks.append(new_block) return new_block
[docs] def duplicate_chain(self): blocks_to_dupe = [self] + self.attached_chain() duped = [] for i in range(len(blocks_to_dupe)): new_block = ProjectBody.Block(**blocks_to_dupe[i].__dict__) new_block.parent_id = None new_block.next_id = None new_block._generate_new_id() if i != 0: new_block.parent_id = duped[i - 1].id duped[i - 1].next_id = new_block.id duped.append(new_block) self.sprite.blocks += duped return duped
[docs] def _reattach(self, new_parent_id, new_next_id_of_old_parent): if self.parent_id is not None: old_parent_block = self.sprite.block_by_id(self.parent_id) self.sprite.blocks.remove(old_parent_block) old_parent_block.next_id = new_next_id_of_old_parent self.sprite.blocks.append(old_parent_block) self.parent_id = new_parent_id if self.parent_id is not None: new_parent_block = self.sprite.block_by_id(self.parent_id) self.sprite.blocks.remove(new_parent_block) new_parent_block.next_id = self.id self.sprite.blocks.append(new_parent_block) self.topLevel = new_parent_id is None
[docs] def reattach_single_block(self, new_parent_id): old_parent_id = str(self.parent_id) self._reattach(new_parent_id, self.next_id) if self.next_id is not None: old_next_block = self.sprite.block_by_id(self.next_id) self.sprite.blocks.remove(old_next_block) old_next_block.parent_id = old_parent_id self.sprite.blocks.append(old_next_block) self.next_id = None
[docs] def reattach_chain(self, new_parent_id): self._reattach(new_parent_id, None)
[docs] def delete_single_block(self): self.sprite.blocks.remove(self) self.reattach_single_block(None, self.next_id)
[docs] def delete_chain(self): self.sprite.blocks.remove(self) self.reattach_chain(None) to_delete = self.attached_chain() for block in to_delete: self.sprite.blocks.remove(block)
[docs] def inputs_as_blocks(self): if self.input_data is None: return None inputs = [] for input in self.input_data: inputs.append(self.sprite.block_by_id(self.input_data[input][1]))
[docs] class Sprite(BaseProjectBodyComponent):
[docs] def from_json(self, data: dict): self.isStage = data["isStage"] self.name = data["name"] self.id = self.name # Sprites are uniquely identifiable through their name self.variables = [] for variable_id in data["variables"]: # self.lists is a dict with the list_id as key and info as value pvar = ProjectBody.Variable(id=variable_id) pvar.from_json(data["variables"][variable_id]) self.variables.append(pvar) self.lists = [] for list_id in data["lists"]: # self.lists is a dict with the list_id as key and info as value plist = ProjectBody.List(id=list_id) plist.from_json(data["lists"][list_id]) self.lists.append(plist) self.broadcasts = data["broadcasts"] self.blocks = [] for block_id in data["blocks"]: # self.blocks is a dict with the block_id as key and block content as value if isinstance(data["blocks"][block_id], dict): # Sometimes there is a weird list at the end of the blocks list. This list is ignored block = ProjectBody.Block(id=block_id, sprite=self) block.from_json(data["blocks"][block_id]) self.blocks.append(block) self.comments = data["comments"] self.currentCostume = data["currentCostume"] self.costumes = [] load_components(data["costumes"], ProjectBody.Asset, self.costumes) # load lists self.sounds = [] load_components(data["sounds"], ProjectBody.Asset, self.sounds) # load lists self.volume = data["volume"] self.layerOrder = data["layerOrder"] if self.isStage: self.tempo = data.get("tempo", None) self.videoTransparency = data.get("videoTransparency", None) self.videoState = data.get("videoState", None) self.textToSpeechLanguage = data.get("textToSpeechLanguage", None) else: self.visible = data.get("visible", None) self.x = data.get("x", None) self.y = data.get("y", None) self.size = data.get("size", None) self.direction = data.get("direction", None) self.draggable = data.get("draggable", None) self.rotationStyle = data.get("rotationStyle", None)
[docs] def to_json(self): return_data = dict(self.__dict__) if "projectBody" in return_data: return_data.pop("projectBody") return_data.pop("id") return_data["variables"] = {} for variable in self.variables: return_data["variables"][variable.id] = variable.to_json() return_data["lists"] = {} for plist in self.lists: return_data["lists"][plist.id] = plist.to_json() return_data["blocks"] = {} for block in self.blocks: return_data["blocks"][block.id] = block.to_json() return_data["costumes"] = [custome.to_json() for custome in self.costumes] return_data["sounds"] = [sound.to_json() for sound in self.sounds] return return_data
[docs] def variable_by_id(self, variable_id): matching = list(filter(lambda x: x.id == variable_id, self.variables)) if matching == []: return None return matching[0]
[docs] def list_by_id(self, list_id): matching = list(filter(lambda x: x.id == list_id, self.lists)) if matching == []: return None return matching[0]
[docs] def variable_by_name(self, variable_name): matching = list(filter(lambda x: x.name == variable_name, self.variables)) if matching == []: return None return matching[0]
[docs] def list_by_name(self, list_name): matching = list(filter(lambda x: x.name == list_name, self.lists)) if matching == []: return None return matching[0]
[docs] def block_by_id(self, block_id): matching = list(filter(lambda x: x.id == block_id, self.blocks)) if matching == []: return None return matching[0]
# -- Functions to modify project contents --
[docs] def create_sound(self, asset_content, *, name="new sound", dataFormat="mp3", rate=4800, sampleCount=4800): data = asset_content if isinstance(asset_content, bytes) else open(asset_content, "rb").read() new_asset_id = hashlib.md5(data).hexdigest() new_asset = ProjectBody.Asset(assetId=new_asset_id, name=name, id=new_asset_id, dataFormat=dataFormat, rate=rate, sampleCound=sampleCount, md5ext=new_asset_id + "." + dataFormat, filename=new_asset_id + "." + dataFormat) self.sounds.append(new_asset) if not hasattr(self, "projectBody"): print( "Warning: Since there's no project body connected to this object, the new sound asset won't be uploaded to Scratch") elif self.projectBody._session is None: print( "Warning: Since there's no login connected to this object, the new sound asset won't be uploaded to Scratch") else: self._session.upload_asset(data, asset_id=new_asset_id, file_ext=dataFormat) return new_asset
[docs] def create_costume(self, asset_content, *, name="new costume", dataFormat="svg", rotationCenterX=0, rotationCenterY=0): data = asset_content if isinstance(asset_content, bytes) else open(asset_content, "rb").read() new_asset_id = hashlib.md5(data).hexdigest() new_asset = ProjectBody.Asset(assetId=new_asset_id, name=name, id=new_asset_id, dataFormat=dataFormat, rotationCenterX=rotationCenterX, rotationCenterY=rotationCenterY, md5ext=new_asset_id + "." + dataFormat, filename=new_asset_id + "." + dataFormat) self.costumes.append(new_asset) if not hasattr(self, "projectBody"): print( "Warning: Since there's no project body connected to this object, the new costume asset won't be uploaded to Scratch") elif self.projectBody._session is None: print( "Warning: Since there's no login connected to this object, the new costume asset won't be uploaded to Scratch") else: self._session.upload_asset(data, asset_id=new_asset_id, file_ext=dataFormat) return new_asset
[docs] def create_variable(self, name, *, value=0, is_cloud=False): new_var = ProjectBody.Variable(name=name, value=value, is_cloud=is_cloud) self.variables.append(new_var) return new_var
[docs] def create_list(self, name, *, value=[]): new_list = ProjectBody.List(name=name, value=value) self.lists.append(new_list) return new_list
[docs] def add_block(self, block, *, parent_id=None): block.parent_id = None block.next_id = None if parent_id is not None: block.reattach_single_block(parent_id) self.blocks.append(block)
[docs] def add_block_chain(self, block_chain, *, parent_id=None): parent = parent_id for block in block_chain: self.add_block(block, parent_id=parent) parent = str(block.id)
[docs] class Variable(BaseProjectBodyComponent):
[docs] def __init__(self, **entries): super().__init__(**entries) if self.id is None: self._generate_new_id()
[docs] def from_json(self, data: list): self.name = data[0] self.saved_value = data[1] self.is_cloud = len(data) == 3
[docs] def to_json(self): if self.is_cloud: return [self.name, self.saved_value, True] else: return [self.name, self.saved_value]
[docs] def make_cloud_variable(self): self.is_cloud = True
[docs] class List(BaseProjectBodyComponent):
[docs] def __init__(self, **entries): super().__init__(**entries) if self.id is None: self._generate_new_id()
[docs] def from_json(self, data: list): self.name = data[0] self.saved_content = data[1]
[docs] def to_json(self): return [self.name, self.saved_content]
[docs] class Monitor(BaseProjectBodyComponent):
[docs] def from_json(self, data: dict): self.__dict__.update(data)
[docs] def to_json(self): return_data = dict(self.__dict__) if "projectBody" in return_data: return_data.pop("projectBody") return return_data
[docs] def target(self): if not hasattr(self, "projectBody"): print("Can't get represented object because the origin projectBody of this monitor is not saved") return if "VARIABLE" in self.params: return self.projectBody.sprite_by_name(self.spriteName).variable_by_name(self.params["VARIABLE"]) if "LIST" in self.params: return self.projectBody.sprite_by_name(self.spriteName).list_by_name(self.params["LIST"])
[docs] class Asset(BaseProjectBodyComponent):
[docs] def from_json(self, data: dict): self.__dict__.update(data) self.id = self.assetId self.filename = self.md5ext self.download_url = f"https://assets.scratch.mit.edu/internalapi/asset/{self.filename}"
[docs] def to_json(self): return_data = dict(self.__dict__) return_data.pop("filename") return_data.pop("id") return_data.pop("download_url") return return_data
[docs] def download(self, *, filename=None, dir=""): if not (dir.endswith("/") or dir.endswith("\\")): dir = dir + "/" try: if filename is None: filename = str(self.filename) response = requests.get( self.download_url, timeout=10, ) open(f"{dir}{filename}", "wb").write(response.content) except Exception: raise ( exceptions.FetchError( "Failed to download asset" ) )
[docs] def __init__(self, *, sprites=[], monitors=[], extensions=[], meta=[{"agent": None}], _session=None): # sprites are called "targets" in the initial API response self.sprites = sprites self.monitors = monitors self.extensions = extensions self.meta = meta self._session = _session
[docs] def from_json(self, data: dict): """ Imports the project data from a dict that contains the raw project json """ # Load sprites: self.sprites = [] load_components(data["targets"], ProjectBody.Sprite, self.sprites) # Save origin of sprite in Sprite object: for sprite in self.sprites: sprite.projectBody = self # Load monitors: self.monitors = [] load_components(data["monitors"], ProjectBody.Monitor, self.monitors) # Save origin of monitor in Monitor object: for monitor in self.monitors: monitor.projectBody = self # Set extensions and meta attributs: self.extensions = data["extensions"] self.meta = data["meta"]
[docs] def to_json(self): """ Returns a valid project JSON dict with the contents of this project """ return_data = {} return_data["targets"] = [sprite.to_json() for sprite in self.sprites] return_data["monitors"] = [monitor.to_json() for monitor in self.monitors] return_data["extensions"] = self.extensions return_data["meta"] = self.meta return return_data
# -- Functions to get info --
[docs] def blocks(self): return [block for sprite in self.sprites for block in sprite.blocks]
[docs] def block_count(self): return len(self.blocks())
[docs] def assets(self): return [sound for sprite in self.sprites for sound in sprite.sounds] + [costume for sprite in self.sprites for costume in sprite.costumes]
[docs] def asset_count(self): return len(self.assets())
[docs] def variable_by_id(self, variable_id): for sprite in self.sprites: r = sprite.variable_by_id(variable_id) if r is not None: return r
[docs] def list_by_id(self, list_id): for sprite in self.sprites: r = sprite.list_by_id(list_id) if r is not None: return r
[docs] def sprite_by_name(self, sprite_name): matching = list(filter(lambda x: x.name == sprite_name, self.sprites)) if matching == []: return None return matching[0]
[docs] def user_agent(self): return self.meta["agent"]
[docs] def save(self, *, filename=None, dir=""): """ Saves the project body to the given directory. Args: filename (str): The name that will be given to the downloaded file. dir (str): The path of the directory the file will be saved in. """ if not (dir.endswith("/") or dir.endswith("\\")): dir = dir + "/" if filename is None: filename = "project" filename = filename.replace(".sb3", "") with open(f"{dir}{filename}.sb3", "w") as d: json.dump(self.to_json(), d, indent=4)
[docs] def get_empty_project_pb(): pb = ProjectBody() pb.from_json(empty_project_json) return pb
[docs] def get_pb_from_dict(project_json: dict): pb = ProjectBody() pb.from_json(project_json) return pb
[docs] def _load_sb3_file(path_to_file): try: with open(path_to_file, "r") as r: return json.loads(r.read()) except Exception as e: with zipfile.ZipFile(path_to_file, 'r') as zip_ref: # Check if the file exists in the zip if "project.json" in zip_ref.namelist(): # Read the file as bytes with zip_ref.open("project.json") as file: return json.loads(file.read()) else: raise ValueError("specified sb3 archive doesn't contain project.json")
[docs] def read_sb3_file(path_to_file): pb = ProjectBody() pb.from_json(_load_sb3_file(path_to_file)) return pb
[docs] def download_asset(asset_id_with_file_ext, *, filename=None, dir=""): if not (dir.endswith("/") or dir.endswith("\\")): dir = dir + "/" try: if filename is None: filename = str(asset_id_with_file_ext) response = requests.get( "https://assets.scratch.mit.edu/" + str(asset_id_with_file_ext), timeout=10, ) open(f"{dir}{filename}", "wb").write(response.content) except Exception: raise ( exceptions.FetchError( "Failed to download asset" ) )
# The method for uploading an asset by id requires authentication and can be found in the site.session.Session class