| #!/usr/bin/env python3
|
|
|
| import os, re, json, requests, hashlib, textwrap, base64, random
|
| from datetime import datetime, timezone, timedelta
|
| import time
|
|
|
| class SD():
|
|
|
| def request(self,
|
| host = "localhost",
|
| port = 8080,
|
| mode = "txt2img",
|
| args = {}
|
| ) -> dict: # return { "msg": "", "gen": {}, "img": [] }
|
| """Request an image from Stable Diffusion server."""
|
| if mode == "txt2img":
|
| args = {
|
| # Prompting
|
| "prompt": "",
|
| "negative_prompt": "",
|
| ### Image
|
| "format": "png",
|
| "width": 512,
|
| "height": 512,
|
| "batch_count": 1,
|
| #"sample_steps": 4,
|
| "seed": -1, # negative means random
|
| ### Generation
|
| #"cfg_scale": 1.0,
|
| #"guidance": 3.5,
|
| ### Sampling
|
| #"sample_method": "euler",
|
| #"schedule": "discrete",
|
| ### Preview
|
| #"preview_interval": 1,
|
| #"preview_mode": "proj",
|
| ### Flags
|
| #"clip_on_cpu": False,
|
| #"tae_decode": False,
|
| #"vae_on_cpu": False,
|
| #"vae_tiling": True,
|
| } | args
|
| base = f"http://{host}:{port}"
|
| path = {
|
| "txt2img": "/txt2img",
|
| "result": "/result",
|
| "sample_methods": "/sample_methods",
|
| "schedules": "/schedules",
|
| "previews": "/previews",
|
| "params": "/params",
|
| "models": "/models",
|
| "model": "/model"
|
| }
|
| if not mode in path:
|
| mode = "txt2img"
|
| dbg = False
|
| rqs = 128 # max number of polls
|
| try: # generation
|
| req = requests.post(base + path[mode], data=json.dumps(args), timeout=10).json()
|
| except Exception as e:
|
| req = None
|
| msg = f"Request {repr(e)}" # error desc
|
| msg = "success"
|
| gen = {}
|
| img = None
|
| for key in ["params", "model"]:
|
| try: # get metadata
|
| gen[key] = requests.get(base + path[key], timeout=10).json()
|
| except Exception as e:
|
| gen[key] = None
|
| msg = f"Request {repr(e)}" # error desc
|
| gen["args"] = args
|
| if req and type(req) is dict:
|
| if "task_id" in req and req["task_id"]:
|
| tid = req["task_id"]
|
| job = { "status": "" }
|
| num = 0
|
| while job["status"] != "Done" and num < rqs: # poll result
|
| try: # get status
|
| num = num + 1 # poll counter
|
| job = requests.get(base + path["result"], params={"task_id": tid}, timeout=10).json()
|
| except Exception as e:
|
| job = { "status": "" }
|
| msg = f"Job error: {repr(e)}" # error desc
|
| if job and type(job) is dict and "status" in job: # progress report
|
| if dbg: print(f"Status: {job['status']}")
|
| if "data" in job and len(job["data"]) > 0:
|
| if dbg: print(f"Data: {json.dumps(job['data'], indent=2)}")
|
| else:
|
| job = { "status": "" }
|
| time.sleep(1) # wait for next poll
|
| if job and type(job) is dict and "status" in job:
|
| if job["status"] == "Done": # completed
|
| if "data" in job and len(job["data"]) > 0:
|
| img = [ base64.b64decode(itm["data"]) for itm in job["data"] ]
|
| return {
|
| "msg": msg,
|
| "gen": gen,
|
| "img": img
|
| } # return { "msg": "", "gen": {}, "img": [] }
|
|
|
| def generate(self,
|
| host = "localhost",
|
| port = 8080,
|
| desc = "",
|
| args = {},
|
| ) -> dict: # { "msg": "", "gen": {}, "img": [{ "time": "", "type". "", "data": "" }] }
|
| """Generate an image from a description of a scene for Stable Diffusion."""
|
| desc = " ".join(desc.split(None)) # scene description
|
| args = args if args and type(args) == dict else {}
|
| seed = args["seed"] if "seed" in args and args["seed"] > 0 else int(random.uniform(0, 2147483647))
|
| args = args | { "prompt": desc, "seed": seed } # overwrite prompt and invalid seed
|
| now = datetime.utcnow()
|
| img_time = now.strftime("%Y%m%dT%H%M%SZ")
|
| img_type = args["format"] if "format" in args else "png" # "bmp", "tga", "png", "jpg"
|
| req = self.request(
|
| host = host,
|
| port = port,
|
| mode = "txt2img",
|
| args = args | { "format": img_type }
|
| ) if host and port and args else None
|
| res = {
|
| "msg": req["msg"] or "Image generation failed.",
|
| "gen": req["gen"] or {},
|
| "img": []
|
| }
|
| img = req["img"]
|
| if img and type(img) is list and len(img): # got some images
|
| for idx, data in enumerate(img):
|
| if len(data):
|
| res["img"].append({
|
| "time": img_time,
|
| "type": img_type,
|
| "data": data
|
| })
|
| return res # { "msg": "", "gen": {}, "img": [{ "time": "", "type". "", "data": "" }] }
|
|
|
| def output(self,
|
| batch = None,
|
| img_path = ["."],
|
| img_tmpl = "img_{time}{bidx}.{type}",
|
| log_path = ["."],
|
| log_name = f"img_gen_log.txt" # cat "img_gen_log.txt" | jq
|
| ): # output images to file
|
| res = ""
|
| def writeFile(
|
| file_path,
|
| file_name,
|
| file_data,
|
| file_mode = "wb"
|
| ): # write data to file
|
| msg = "nothing to write"
|
| if file_path.strip() and file_name.strip():
|
| try:
|
| if not os.path.exists(file_path):
|
| os.makedirs(file_path)
|
| except IOError as e:
|
| msg = f"writeFile: Error creating directory {file_path} {repr(e)}"
|
| return msg
|
| try:
|
| full_path = os.path.join(file_path, file_name)
|
| with open(full_path, file_mode) as out: # write to file
|
| pos = out.write(file_data)
|
| if pos > 0:
|
| msg = f"writeFile: {pos} bytes written to {file_name}"
|
| else:
|
| msg = f"writeFile: No bytes written to {file_name}"
|
| except IOError as e:
|
| msg = f"writeFile: Error writing to file {full_path} {repr(e)}"
|
| return msg
|
| return msg
|
| def mimeType(ext):
|
| mime = {
|
| "bmp": "image/bmp",
|
| "tga": "image/tga",
|
| "png": "image/png",
|
| "jpg": "image/jpeg",
|
| }
|
| return mime[ext] if ext in mime else None
|
| img_path = os.path.abspath(os.path.join(*(img_path or ["*"])))
|
| log_path = os.path.abspath(os.path.join(*(log_path or ["*"])))
|
| if batch and "msg" in batch and "gen" in batch and "img" in batch and img_path and img_tmpl:
|
| msg = batch["msg"]
|
| gen = batch["gen"]
|
| img = batch["img"]
|
| img_seed = 0
|
| img_desc = ""
|
| if gen and type(gen) is dict and "args" in gen:
|
| args = gen["args"]
|
| if args and type(args) is dict and "seed" in args:
|
| img_seed = args["seed"]
|
| if args and type(args) is dict and "prompt" in args:
|
| img_desc = args["prompt"]
|
| # img_desc = re.sub(r"\s+", " ", img_desc).strip()
|
| if img and type(img) is list and len(img):
|
| img_list = []
|
| for idx, itm in enumerate(img):
|
| img_time = itm["time"]
|
| img_type = itm["type"]
|
| img_data = itm["data"]
|
| img_name = img_tmpl.format(
|
| time = img_time,
|
| bidx = f"_{idx}",
|
| seed = img_seed,
|
| type = img_type
|
| )
|
| img_list.append(img_name)
|
| img_stat = writeFile(
|
| file_path = img_path,
|
| file_name = img_name,
|
| file_data = img_data,
|
| file_mode = "wb"
|
| )
|
| res += ", " if len(res) else ""
|
| res += img_stat
|
| log_stat = writeFile(
|
| file_path = log_path,
|
| file_name = log_name,
|
| file_data = json.dumps({
|
| "time": img_time,
|
| "name": img_list,
|
| "seed": img_seed,
|
| "desc": img_desc,
|
| "type": img_type,
|
| "mime": mimeType(img_type),
|
| "data": {
|
| "params": None,
|
| "model": None,
|
| "args": None
|
| } | gen
|
| }) + "\n",
|
| file_mode = "a"
|
| )
|
| res += ", " if len(res) else ""
|
| res += log_stat
|
| return res
|
|
|
| class SDImage(SD):
|
|
|
| def rndText(self, cats={}, txts=[]): # random text from grammar variants.
|
| text = ""
|
| if cats and txts:
|
| text = random.choice(txts)
|
| toks = re.findall(r'\{(\w+)\}', text)
|
| for tok in toks:
|
| if tok in cats:
|
| text = text.replace(f"{{{tok}}}", random.choice(cats[tok]), 1)
|
| return " ".join(text.split(None))
|
|
|
| def runJobs(self,
|
| cats = {},
|
| txts = [],
|
| jobs = 1,
|
| host = "localhost",
|
| port = 8080,
|
| args = {}
|
| ):
|
| for job in range(0, jobs):
|
| res = ""
|
| desc = self.rndText(cats = cats, txts = txts)
|
| print("Generating:", desc)
|
| batch = self.generate(
|
| host = host or "localhost",
|
| port = port or 8080,
|
| desc = desc,
|
| args = args
|
| )
|
| res += batch["msg"]
|
| res += ", " if len(res) else ""
|
| res += self.output(
|
| batch = batch,
|
| img_path = ["."],
|
| img_tmpl = "img_{time}{bidx}.{type}",
|
| log_path = ["."],
|
| log_name = f"img_gen_log.txt" # cat "img_gen_log.txt" | jq
|
| )
|
| print("result:", repr({'job': job, 'desc': desc, 'res': res}))
|
|
|
|
|
| sdi = SDImage()
|
| sdi.runJobs(
|
| cats={ # category choices to randomly choose from.
|
| #"adjs": ["bouncy", "twisted", "cracked", "bronze", "malachite", "stone", "classic", "ornate", "decorative", "psychedelic", "shrinkwrapped", "plantlike", "mechanical", "rubber", "fluffy", "furry", "metallic", "rusted", "studded", "wooden", "dusty", "dirty", "shiny", "carved", "steampunk", "futuristic"],
|
| #"objs": ["dog", "cat", "boat", "car", "truck", "goat", "fish", "tiger", "house", "elephant", "pig", "wasp", "beetle", "shrimp", "crab", "jet airliner", "fighter jet", "bus", "bicycle", "lizard", "octopus", "squid", "crocodile", "snake"],
|
| #"locs": ["at the beach", "in space", "in a forest", "in a tropical jungle", "in a church", "at home", "in the street", "in a field", "in a cave", "in a garden", "in a castle", "in a factory", "in a workshop", "underwater"],
|
| #"stys": ["realistic photo", "artistic style", "oil painting"],
|
| "adjs": [
|
| "bouncy", "spikey", "terracota", "foil", "skeletal", "intricate",
|
| "aztec", "roman", "spiney", "demonic", "broken", "bubblewrapped",
|
| "bubbly", "sparky", "derelict", "drab", "golden", "chocolate",
|
| "cabbage", "chainmail", "leafy", "aluminium", "mouldy",
|
| "camouflaged", "cardboard", "paper", "fabric", "feathered",
|
| "clawed", "toothy", "twisted", "cracked", "bronze", "cast iron",
|
| "malachite", "slimy", "plastic", "wax", "plasticine", "armoured",
|
| "insectoid", "muddy", "crystaline", "encrusted", "melted", "stone",
|
| "classic", "ornate", "decorative", "psychedelic", "shrinkwrapped",
|
| "plantlike", "granite", "obsidian", "pyrite", "satin", "sandstone",
|
| "carbon fibre", "synthetic", "cloth", "clockwork", "mechanical",
|
| "rubber", "fluffy", "furry", "metallic", "rusted", "studded",
|
| "wooden", "dusty", "dirty", "shiny", "carved", "steampunk",
|
| "futuristic", "papier mache"
|
| ],
|
| "objs": [
|
| "dog", "horse", "cat", "swan", "eagle", "cow", "limousine",
|
| "snowmobile", "snowplough", "locomotive", "ceramic", "giraffe",
|
| "panda", "garbage truck", "cement mixer truck", "firetruck",
|
| "towtruck", "pickup truck", "flatbed truck",
|
| "backhoe loader", "telehandler", "aerial work platform", "leopard",
|
| "hedgehog", "duck", "mouse", "jeep", "quadbike", "motorbike",
|
| "worms", "battle tank", "jetski", "hydrofoil", "speedboat",
|
| "catamaran", "sailing ship", "yacht", "bear",
|
| "armoured personel carrier", "snail", "hatchback car",
|
| "mobile crane", "gantry crane", "shpping container", "crate",
|
| "skateboard", "nautilus", "sheep", "catepillars", "SUV", "shark",
|
| "eels", "boat", "truck", "goat", "supercar", "owl", "automobile",
|
| "tractor", "tanker truck", "tipper truck", "ship", "fishing boat",
|
| "f1 car", "convertible car", "excavator", "bulldozer", "campervan",
|
| "fish", "tiger", "semi trailer truck", "box truck", "van", "house",
|
| "hippapotamus", "rhinocerous", "pirahna", "spinosaurus",
|
| "dimetrodon", "monkey", "gorrila", "trout", "salamander",
|
| "jellyfish", "wolf", "lemur", "chameleon", "gecko", "frog", "ant",
|
| "fox", "rabbit", "triceratops", "tyranosaurus", "elephant", "pig",
|
| "wasp", "manta ray", "diatoms", "beetle", "earwig", "sea urchin",
|
| "shrimp", "lobster", "angler fish", "starfish", "crab",
|
| "jet airliner", "biplane", "anaconda", "fighter jet", "bus",
|
| "bicycle", "lizard", "octopus", "squid", "crocodile", "snake",
|
| "cobra"
|
| ],
|
| "locs": [
|
| "at the beach", "in a kitchen", "in a museum", "in the wilderness",
|
| "in a valley", "in a canyon", "in a desert",
|
| "in a mountainous landscape", "in the arctic", "in a temple",
|
| "in a cityscape", "in a suburban street", "by a river", "in a pond",
|
| "in a barn", "in a living room", "in a garden", "in a greenhouse",
|
| "in an amusement park", "in a harbour", "in space", "in a quarry",
|
| "in a scrapyard", "in a rubbish tip", "in a rural village",
|
| "by a lakeside", "in a forest", "in an airport", "under a bridge",
|
| "in a tunnel", "at a tropical beach", "in a tropical jungle",
|
| "in a church", "at home", "in the street", "in a field",
|
| "in a cave", "in a garden", "in a castle", "in a warehouse",
|
| "in a luxury hotel", "in a factory", "in a workshop", "underwater"
|
| ],
|
| "view": [
|
| "rear view", "side view", "overhead view", "close up", "front view",
|
| "rear side view", "front side view", "", "", "", "", ""
|
| ],
|
| },
|
| txts=[ # grammar variants using category random choices.
|
| # "{adjs} {adjs} {objs} {locs}, {stys}",
|
| "{adjs} {objs} {view} {locs}",
|
| "{adjs} {adjs} {objs} {view} {locs}",
|
| ],
|
| host = "192.168.X.Y",
|
| port = 10000,
|
| jobs = 2,
|
| args = {
|
| # Prompting
|
| "prompt": "", # overwritten
|
| "negative_prompt": "",
|
| ### Image
|
| "format": "png",
|
| "width": 1024,
|
| "height": 1024,
|
| "batch_count": 2,
|
| "sample_steps": 4,
|
| "seed": -1, # negative means random
|
| ### Generation
|
| "cfg_scale": 1.0,
|
| "guidance": 3.5,
|
| ### Sampling
|
| "sample_method": "euler",
|
| "schedule": "discrete",
|
| ### Preview
|
| "preview_interval": 1,
|
| "preview_mode": "proj",
|
| ### Flags
|
| "clip_on_cpu": False,
|
| "tae_decode": False,
|
| "vae_on_cpu": False,
|
| "vae_tiling": True,
|
| }
|
| )
|
| # cat stable-diffusion-generation-log.txt | jq
|