6  Asyncio Walkthrough

6.1 파일: areq.py

여러 페이지의 HMTL에 포함된 링크를 비동기식으로 가져옵니다.

#!/usr/bin/env python3
# areq.py

"""Asynchronously get links embedded in multiple pages' HMTL."""

import asyncio
import logging
import re
import sys
import urllib.error
import urllib.parse
from typing import IO

import aiofiles
import aiohttp
from aiohttp import ClientSession

logging.basicConfig(
    format="%(asctime)s %(levelname)s:%(name)s: %(message)s",
    level=logging.DEBUG,
    datefmt="%H:%M:%S",
    stream=sys.stderr,
)
logger = logging.getLogger("areq")
logging.getLogger("chardet.charsetprober").disabled = True

HREF_RE = re.compile(r'href="(.*?)"')


async def fetch_html(url: str, session: ClientSession, **kwargs) -> str:
    """GET request wrapper to fetch page HTML.

    kwargs are passed to `session.request()`.
    """

    # Don't do any try/except here.  If either the request or reading
    # of bytes raises, let that be handled by caller.
    resp = await session.request(method="GET", url=url, **kwargs)
    resp.raise_for_status()  # raise if status >= 400
    logger.info("Got response [%s] for URL: %s", resp.status, url)
    html = await resp.text()  # For bytes: resp.read()

    # Dont close session; let caller decide when to do that.
    return html


async def parse(url: str, session: ClientSession, **kwargs) -> set:
    """Find HREFs in the HTML of `url`."""
    found = set()
    try:
        html = await fetch_html(url=url, session=session, **kwargs)
    except (
        aiohttp.ClientError,
        aiohttp.http_exceptions.HttpProcessingError,
    ) as e:
        logger.error(
            "aiohttp exception for %s [%s]: %s",
            url,
            getattr(e, "status", None),
            getattr(e, "message", None),
        )
        return found
    except Exception as e:
        # May be raised from other libraries, such as chardet or yarl.
        # logger.exception will show the full traceback.
        logger.exception(
            "Non-aiohttp exception occured:  %s", getattr(e, "__dict__", {})
        )
        return found
    else:
        # This portion is not really async, but it is the request/response
        # IO cycle that eats the largest portion of time.
        for link in HREF_RE.findall(html):
            try:
                # Ensure we return an absolute path.
                abslink = urllib.parse.urljoin(url, link)
            except (urllib.error.URLError, ValueError):
                logger.exception("Error parsing URL: %s", link)
                pass
            else:
                found.add(abslink)
        logger.info("Found %d links for %s", len(found), url)
        return found


async def write_one(file: IO, url: str, **kwargs) -> None:
    """Write the found HREFs from `url` to `file`."""
    res = await parse(url=url, **kwargs)
    if not res:
        return None
    async with aiofiles.open(file, "a") as f:
        for p in res:
            await f.write(f"{url}\t{p}\n")
        logger.info("Wrote results for source URL: %s", url)


async def bulk_crawl_and_write(file: IO, urls: set, **kwargs) -> None:
    """Crawl & write concurrently to `file` for multiple `urls`."""
    async with ClientSession() as session:
        tasks = []
        for url in urls:
            tasks.append(
                write_one(file=file, url=url, session=session, **kwargs)
            )
        await asyncio.gather(*tasks)  # see also: return_exceptions=True


if __name__ == "__main__":
    import pathlib
    import sys

    assert sys.version_info >= (3, 7), "Script requires Python 3.7+."
    here = pathlib.Path(__file__).parent

    with open(here.joinpath("urls.txt")) as infile:
        urls = set(map(str.strip, infile))

    # Header - just a single, initial row-write
    outpath = here.joinpath("foundurls.txt")
    with open(outpath, "w") as outfile:
        outfile.write("source_url\tparsed_url\n")

    asyncio.run(bulk_crawl_and_write(file=outpath, urls=urls))

6.2 파일: asyncq.py

#!/usr/bin/env python3
# asyncq.py

import asyncio
import itertools as it
import os
import random
import time


async def makeitem(size: int = 5) -> str:
    return os.urandom(size).hex()


async def seconds() -> float:
    return time.perf_counter()


async def randint(a: int, b: int) -> int:
    return random.randint(a, b)


async def randsleep(a: int = 1, b: int = 5, caller=None) -> None:
    i = await randint(a, b)
    if caller:
        print(f"{caller} sleeping for {i} seconds.")
    await asyncio.sleep(i)


async def produce(name: int, q: asyncio.Queue) -> None:
    n = await randint(1, 5)
    for _ in it.repeat(None, n):  # Synchronous
        await randsleep(caller=f"Producer {name}")
        i = await makeitem()
        t = await seconds()
        await q.put((i, t))
        print(f"Producer {name} added <{i}> to queue.")


async def consume(name: int, q: asyncio.Queue) -> None:
    while True:
        await randsleep(caller=f"Consumer {name}")
        i, t = await q.get()
        now = await seconds()
        print(f"Consumer {name} got element <{i}> in {now - t:0.5f} seconds.")
        q.task_done()


async def main(nprod: int, ncon: int):
    q = asyncio.Queue()
    producers = [asyncio.create_task(produce(n, q)) for n in range(nprod)]
    consumers = [asyncio.create_task(consume(n, q)) for n in range(ncon)]
    await asyncio.gather(*producers)
    await q.join()
    for c in consumers:
        c.cancel()


if __name__ == "__main__":
    import argparse

    random.seed(444)
    parser = argparse.ArgumentParser()
    parser.add_argument("-p", "--nprod", type=int, default=5)
    parser.add_argument("-c", "--ncon", type=int, default=10)
    ns = parser.parse_args()
    start = time.perf_counter()
    asyncio.run(main(**ns.__dict__))
    elapsed = time.perf_counter() - start
    print(f"Program completed in {elapsed:0.5f} seconds.")

6.3 파일: chained.py

#!/usr/env/bin python3
# chained.py

import asyncio
import random
import time


async def randint(a: int, b: int) -> int:
    return random.randint(a, b)


async def part1(n: int) -> str:
    i = await randint(0, 10)
    print(f"part1({n}) sleeping for {i} seconds.")
    await asyncio.sleep(i)
    result = f"result{n}-1"
    print(f"Returning part1({n}) == {result}.")
    return result


async def part2(n: int, arg: str) -> str:
    i = await randint(0, 10)
    print(f"part2{n, arg} sleeping for {i} seconds.")
    await asyncio.sleep(i)
    result = f"result{n}-2 derived from {arg}"
    print(f"Returning part2{n, arg} == {result}.")
    return result


async def chain(n: int) -> None:
    start = time.perf_counter()
    p1 = await part1(n)
    p2 = await part2(n, p1)
    end = time.perf_counter() - start
    print(f"-->Chained result{n} => {p2} (took {end:0.2f} seconds).")


async def main(*args):
    await asyncio.gather(*(chain(n) for n in args))


if __name__ == "__main__":
    import sys

    random.seed(444)
    args = [1, 2, 3] if len(sys.argv) == 1 else map(int, sys.argv[1:])
    start = time.perf_counter()
    asyncio.run(main(*args))
    end = time.perf_counter() - start
    print(f"Program finished in {end:0.2f} seconds.")

6.4 파일: countasync.py

#!/usr/bin/env python3
# countasync.py

import asyncio


async def count():
    print("One")
    await asyncio.sleep(1)
    print("Two")


async def main():
    await asyncio.gather(count(), count(), count())


if __name__ == "__main__":
    import time

    s = time.perf_counter()
    asyncio.run(main())
    elapsed = time.perf_counter() - s
    print(f"{__file__} executed in {elapsed:0.2f} seconds.")

6.5 파일: countsync.py

#!/usr/bin/env python3
# countsync.py

import time


def count():
    print("One")
    time.sleep(1)
    print("Two")


def main():
    for _ in range(3):
        count()


if __name__ == "__main__":
    s = time.perf_counter()
    main()
    elapsed = time.perf_counter() - s
    print(f"{__file__} executed in {elapsed:0.2f} seconds.")

6.6 파일: phases.py

#!/usr/env/bin python3
# phases.py

import asyncio


async def phase1(callerid: str):
    print(f"phase 1 called from {callerid}")
    await asyncio.sleep(2)
    return "result1"


async def phase2(callerid: str, arg: str):
    print(f"phase 2 called from {callerid}")
    await asyncio.sleep(2)
    # No await needed here - arg is passed from caller.
    return f"result2 derived from {arg}"


async def outer(callerid: str):
    """A wrapper for parameterizing a full coroutine."""
    print(f"outer called from {callerid}")
    r1 = await phase1(callerid)
    r2 = await phase2(callerid, r1)
    return r1, r2


async def main():
    """Wrap the coroutines into tasks and execute."""
    results = await asyncio.gather(*(outer(i) for i in "ABC"))
    return results


if __name__ == "__main__":
    asyncio.run(main())

6.7 파일: rand.py

#!/usr/bin/env python3
# rand.py

import asyncio
import random

# colors
c = (
    "\033[0m",  # end of color
    "\033[36m",  # cyan
    "\033[91m",  # red
    "\033[35m",  # magenta
)


async def randint(a: int, b: int) -> int:
    return random.randint(a, b)


async def makerandom(idx: int, threshold: int = 6) -> int:
    print(c[idx + 1] + f"Initiated makerandom({idx}).")
    i = await randint(0, 10)
    while i <= threshold:
        print(c[idx + 1] + f"makerandom({idx}) == {i} too low; retrying.")
        await asyncio.sleep(idx + 1)
        i = await randint(0, 10)
    print(c[idx + 1] + f"---> Finished: makerandom({idx}) == {i}" + c[0])
    return i


async def main():
    res = await asyncio.gather(*(makerandom(i, 10 - i - 1) for i in range(3)))
    return res


if __name__ == "__main__":
    random.seed(444)
    r1, r2, r3 = asyncio.run(main())
    print()
    print(f"r1: {r1}, r2: {r2}, r3: {r3}")