The Query class

When you call a script object it returns an instance of the Query class which holds the code from the script file and the parameters passed to the script call.

Queries are executed lazily. This means you have to either call a method of the query object or iterate over it to cause the execution of the query against the DBMS.

Getting multiple rows from a query

You can either iterate directly over the query object or call its all() method to get a list of all the rows.

with db.cursor as cur:
    result = cur.users.by_city(city='City 1')
    for row in result:
        print(row.name)

    # calling the .all() method to get a materialized list/tuple
    user_list = cur.users.by_city(city='City 1').all()
    # is a bit faster than
    user_list = list(cur.users.by_city(city='City 1'))

When calling all() MySQL and MariaDB will return a tuple, PostgreSQL and SQLite will return a list.

Note

If you are using PyPy with the sqlite3 driver the cast using the list() function does not currently work and will always result in an empty list.

Getting a single row

If you know there will be only one row in the result of a query you can use the one() method to get it. quma will raise a DoesNotExistError error if there is no row in the result and a MultipleRowsError if there is more than one row.

from quma import (
    DoesNotExistError,
    MultipleRowsError,
)
...

with db.cursor as cur:
    try:
        user = cur.users.by_id(id=13).one()
    except DoesNotExistError:
        print('The user does not exist')
    except MultipleRowsError:
        print('There are multiple users with the same id')

DoesNotExistError and MultipleRowsError are also attached to the Database class so you can access it from the db instance. For example:

with db.cursor as cur:
    try:
        user = cur.users.by_id(id=13).one()
    except db.DoesNotExistError:
        print('The user does not exist')
    except db.MultipleRowsError:
        print('There are multiple users with the same id')

It is also possible to get a single row by accessing its index on the result set:

user = cur.users.by_id(id=13)[0]
# or
users = cur.users.by_id(id=13)
user = users[0]

If you want the first row of a result set which may have more than one row or none at all you can use the first() method:

# "user" will be None if there are no rows in the result.
user = cur.users.all().first()

The method value() invokes the one() method, and upon success returns the value of the first column of the row (i. e. fetchall()[0][0]). This comes in handy if you are using a RETURNING clause, for example, or return the last inserted id after an insert.

last_inserted_id = cur.users.insert().value()

Execute only

To simply execute a query without needing its result you call the run() method:

with db.cursor as cur:
    cur.user.add(name='User',
                 email='user@example.com',
                 city='City').run()

    # or
    query = cur.user.add(name='User',
                         email='user@example.com',
                         city='City')
    query.run()

This is handy if you only want to execute the query, e. g. DML statements like INSERT, UPDATE or DELETE where you don’t need a fetch call.

Getting data in chunks

quma supports the fetchmany method of Python’s DBAPI by providing the many() method of Query. many() returns an instance of ManyResult which implements the get() method which internally calls the fetchmany method of the underlying cursor.

many_users = cur.users.by_city(city='City').many()
first_two = manyusers.get(2) # the first call of get executes the query
next_three = manyusers.get(3)
next_two = manyusers.get(2)

Another example:

def users_generator()
    with db.cursor as cur:
        many_users = cur.users.all().many()
        batch = many_users.get(3) # the first call of get executes the query
        while batch:
            for result in batch:
                yield result
            batch = many_users.get(3)

for user in users_generator():
    print(user.name)

Note

In contrast to all other fetching methods of the query object, like all(), first(), or one(), a call of many() will not execute the query. Instead, the first call of the get() method of a many result object will cause the execution. Also, results of many calls are not cached and if a query was already executed the many mechanism will execute it again anyway. So keep in mind that already executed queries will be re-executed when many() is called after the first execution, as in:

all_users = cur.users.all()
first_user = allusers.first() # query executed the first time
many_users = allusers.many()
first_two = manyusers.get(2) # query executed a second time

Additionally, the cache of all_users from the last example will be invalidated after the first call of get(). So you should avoid to mix many queries with “normal” queries.

A simpler version of many()

If your expected result set is too large for simply iterating over the query object or calling all() (as they call fetchall internally) but you like to work with the result in a single simple loop instead of using many(), you can use the method unbunch(). It is a convenience method which internally calls fetchmany with the given size. Using unbunch() we can simplify the many() example with the users_generator from the last section:

with db.cursor as cur:
    for user in cur.users.all().unbunch(3):
        print(user.name)

unbunch() re-excutes the query and invalidates the cache on each call, just like many().

Getting the number of rows

If you are only interested in the number of row in a result you can pass a Query object to the len() function. quma also includes a convenience method called count(). Some drivers (like pycopg2) support the rowcount property of PEP249 which specifies the number of rows that the last execute produced. If it is available it will be used to determine the number of rows, otherwise a fetchall will be executed and passed to len() to get the number.

number_of_users = len(cur.users.all())
number_of_users = cur.users.all().count()
number_of_users = db.users.all(cur).count()

Note

len() or count() calls must occure before fetch calls like one() or all(). This has to do with the internals of the DBAPI drivers. A fetch would overwrite the value of rowcount which would return -1 afterwards.

Checking if a result exists

To check if a query has a result or not call the exists() method.

has_users = cur.users.all().exists()

You can also use the query object itself for truth value testing:

all_users = cur.users.all()
if all_users:
    user1 = allusers.first()

Ad hoc queries

To run an ad hoc query you can use the query method of the cursor:

with db.cursor as cur:
    sql = 'SELECT name, city FROM users WHERE email = ?;'
    user = cur.query(sql, 'user.1@example.com').one()

Prepared statements

quma does not have a special API for prepared statements, but you can easily use them. In the following example we use PostgreSQL’s syntax. Given a SQL script sqlscripts/users/prepare.sql with the content below …

PREPARE prep (varchar(128), int) AS
SELECT name, city FROM users WHERE email = $1 AND 1 = $2;

… you can use it as shown here:

with db.cursor as cur:
   cur.users.pgsql_prepare().run()
   for i in range(1, 5):
         q = cur.query(f"EXECUTE prep('user.{i}@example.com', 1);")
         assert q.value() == f'User {i}'
   cur.query(f"DEALLOCATE PREPARE prep;").run()

Results are cached

As described above, quma executes queries lazily. Only after the first call of a method or when an iteration over the query object is started, the data will be fetched. The fetched result will be cached in the query object. This means you can perform more than one operation on the object while the query will not be re-executed. If you want to re-execute it, you need to call run() manually.

with db.cursor as cur:
    all_users = cur.users.all()

    for user in all_users:
        # the result is fetched and cached on the first iteration
        print(user.name)

    # get a list of all users from the cache
    all_users.all()
    # get the first user from the cache
    all_users.first()

    # re-execute the query
    all_users.run()

    # fetch and cache the new result of the re-executed query
    all_users.all()

Accessing the underlying cursor

You can access the attributes of the cursor which is used to execute the query directly on the query object.

with db.cursor as cur:
    added = cur.users.add(name='User', email='user.1@example.com').run()
    if added.lastrowid:
        user = cur.user.by_id(id=added.lastrowid).run()
        user.fetchone()

Overview

Class Query

class quma.query.Query(script, cursor, args, kwargs, prepare_params)

The query object is the value you get when you run a query, i. e. call a Script object.

all()

Return a list of all results

count()

Return the length of the result.

exists()

Return if the query’s result has rows.

first()

Get exactly one row and return None if there is no row present in the result.

many()

Return a ManyResult object initialized with this query object.

one()

Get exactly one row and check if only one exists, otherwise raise an error.

run()

Execute the query using the DBAPI driver.

unbunch(size=None)

Return a generator that simplifies the use of fetchmany.

Parameters:size – The number of rows to be fetched per fetchmany call. If not given use the default value of the driver.
value(key=0)

Call one() and return the first column by default.

Parameters:key – Return the value at key’s position instead of the first column.

Class ManyResult

class quma.query.ManyResult(query)
get(size=None)

Call the fetchmany() method of the raw cursor.

Parameters:size – The number of rows to be returned. If not given use the default value of the driver.