Skip to content

Libraries#

This is a slowly updating list, of databases, supported by this library.

Class for asyncpg
class Asyncpg(db_scheduler.DatabaseBuilder):
    _pool: t.Any | None = None

    @property
    def pool(self):
        if not self._pool:
            raise ValueError("You must connect the database for use.")
        return self._pool

    def __init__(
        self, 
        username: str, 
        password: str, 
        database: str,
        host: str = "127.0.0.1",
        port: int = 5432,
        **kwargs: t.Any
        ) -> None:
        self._kwargs: dict[str, t.Any] = {"user":username, "password":password, "database": database, "host":host, "port":port}

        self._kwargs.update(kwargs)


    async def connect(self) -> None:
        self._pool = await asyncpg.create_pool(
            **self._kwargs, min_size=5, max_size=5
        )

    async def shutdown(self) -> None:
        await self.pool.close()

    async def _check_table(self) -> None:
        async with self.pool.acquire() as con:
            await con.execute("CREATE TABLE IF NOT EXISTS db_scheduler_store(name text NOT NULL, key text NOT NULL, time bigint NOT NULL, PRIMARY KEY (name, key))")

    async def add(self,timer: db_scheduler.Timer) -> None:
        await self._check_table()

        async with self.pool.acquire() as con:
            await con.execute("INSERT INTO db_scheduler_store(name, key, time) VALUES ($1, $2, $3)", timer.name, timer.key, timer.time)

    async def fetch(self, name: str, key: str) -> db_scheduler.Timer:
        await self._check_table()

        async with self.pool.acquire() as con:
            data = await con.fetchrow(  
                "SELECT * FROM db_scheduler_store WHERE name = $1 AND key = $2", name, key
            )

        if data is None:
            raise ValueError("Timer not found.")

        return db_scheduler.Timer(*data.values())

    async def fetch_all(self) -> t.Sequence[db_scheduler.Timer] | None:
        await self._check_table()

        async with self.pool.acquire() as con:
            data = await con.fetch(  
                "SELECT * FROM db_scheduler_store"
            )

        if len(data) <= 0:
            return

        timer_list: list[db_scheduler.Timer] = []
        for record in data:
            timer_list.append(db_scheduler.Timer(*record.values()))

        return timer_list

    async def delete(self, name: str, key: str) -> None:
        await self._check_table()

        async with self.pool.acquire() as con:
            await con.execute("DELETE FROM db_scheduler_store WHERE name = $1 AND key = $2", name, key)

    async def delete_all(self, name: str) -> None:
        await self._check_table()

        async with self.pool.acquire() as con:
            await con.execute("DELETE FROM db_scheduler_store WHERE name = $1", name)

    async def clear(self) -> None:
        await self._check_table()

        async with self.pool.acquire() as con:
            await con.execute("DELETE FROM db_scheduler_store")
Class for aiosqlite
class AioSqlite(db_scheduler.DatabaseBuilder):
    _connection: aiosqlite.Connection | None = None

    def __init__(self, file_location: str) -> None:
        self._file_location = file_location

    @property
    def connection(self) -> aiosqlite.Connection:
        if not self._connection:
            raise ValueError("Database must be initialized before use.")

        return self._connection

    async def connect(self) -> None:
        self._connection = await aiosqlite.connect(self._file_location)

    async def shutdown(self) -> None:
        await self.connection.close()

    async def _check_table(self) -> None:
        async with self.connection.cursor() as con:
            await con.execute("CREATE TABLE IF NOT EXISTS db_scheduler_store (name TEXT NOT NULL, key TEXT NOT NULL, time INTEGER NOT NULL, default_time INTEGER NOT NULL, status INTEGER NOT NULL, PRIMARY KEY(name, key))")

        await self.connection.commit()

    async def add(self,timer: db_scheduler.Timer) -> None:
        await self._check_table()

        async with self.connection.cursor() as con:
            await con.execute("INSERT INTO db_scheduler_store(name, key, time, default_time, status) VALUES (?, ?, ?, ?, ?)", (timer.name, timer.key, timer.time, timer.default_time, timer.status.value))

            await self.connection.commit()

    async def fetch(self, name: str, key: str) -> db_scheduler.Timer:
        await self._check_table()

        async with self.connection.cursor() as con:
            await con.execute(  
                "SELECT * FROM db_scheduler_store WHERE name = ? AND key = ?", (name, key)
            )
            await self.connection.commit()

            data = await con.fetchone()

        if data is None:
            raise ValueError("Timer not found.")

        return db_scheduler.Timer(*data)

    async def fetch_all(self) -> t.Sequence[db_scheduler.Timer]:
        await self._check_table()

        async with self.connection.cursor() as con:
            await con.execute(  
                "SELECT * FROM db_scheduler_store"
            )
            await self.connection.commit()

            data = await con.fetchall()

        timer_list: list[db_scheduler.Timer] = []
        for record in data:
            timer_list.append(db_scheduler.Timer(*record))

        return timer_list

    async def delete(self, name: str, key: str) -> None:
        await self._check_table()

        async with self.connection.cursor() as con:
            await con.execute("DELETE FROM db_scheduler_store WHERE name = ? AND key = ?", (name, key))

            await self.connection.commit()

    async def delete_all(self, name: str) -> None:
        await self._check_table()

        async with self.connection.cursor() as con:
            await con.execute("DELETE FROM db_scheduler_store WHERE name = ?", name)

            await self.connection.commit()

    async def clear(self) -> None:
        await self._check_table()

        async with self.connection.cursor() as con:
            await con.execute("DELETE FROM db_scheduler_store")

            await self.connection.commit()