Module beanstalkd

This module provides a client API to a beanstalkd server for both producers and consumers.

If you want more details on the underlying beanstalkd protocol, or the lifecycle of a beanstalk job, see this protocol specification.

Types

StatusCode = enum
  badFormat, buried, deadlineSoon, deleted, draining, expectedCrLf, found,
  inserted, internalError, jobTooBig, kicked, notFound, notIgnored, ok,
  outOfMemory, paused, released, reserved, timedOut, touched, unknownCommand,
  unknownResponse, usingOk, watching
Represents all possible respons messages possible from the beanstalkd service.
BeanResponse = tuple[success: bool, status: StatusCode]
This type is returned by many of the beanstalkd procedures. It tells you if the operation was a success, and provides the status returned by the operation.
BeanIntResponse = tuple[success: bool, status: StatusCode, value: int]
This type is returned by beanstalkd procedures which needs to return an integer.
BeanJob = tuple[success: bool, status: StatusCode, id: int, job: string]
This tuple is returned by beanstalkd procedures used to retrieve jobs. The actual job data is held by the job property.

Procs

proc open(address: string; port = Port(11300)): Socket {.raises: [OSError],
    tags: [ReadIOEffect].}
Opens a socket conection to a beanstalkd server.
proc listTubes(socket: Socket): seq[string] {.
    raises: [OSError, TimeoutError, OverflowError, ValueError],
    tags: [WriteIOEffect, ReadIOEffect, TimeEffect].}
Get a list of all available tubes.
proc use(socket: Socket; tube: string): BeanResponse {.
    raises: [OSError, TimeoutError],
    tags: [WriteIOEffect, ReadIOEffect, TimeEffect].}
Used by job producers to specify which tube to put jobs to. By default jobs go to the default tube (duh!).
proc listTubeUsed(socket: Socket): string {.raises: [OSError, TimeoutError],
    tags: [WriteIOEffect, ReadIOEffect, TimeEffect].}

Get the name of the currently used tube (where new jobs will be put).

The name is a bit strange, since it does not provide a list, but it's the name chosen by the beanstalkd author :)

proc usedTube(socket: Socket): string {.raises: [OSError, TimeoutError], tags: [
    WriteIOEffect, ReadIOEffect, TimeEffect].}
This is an alias for listTubeUsed.
proc put(socket: Socket; data: string; pri = 100; delay = 0; ttr = 5): BeanIntResponse {.
    raises: [ValueError, OSError, TimeoutError, OverflowError],
    tags: [WriteIOEffect, ReadIOEffect, TimeEffect].}

Inserts a job into the currently used tube (see use).

data is the job body.

pri is an integer < 2**32. Jobs with smaller priority values will be scheduled before jobs with larger priorities. The most urgent priority is 0; the least urgent priority is 4,294,967,295.

delay is an integer number of seconds to wait before putting the job in the ready queue. The job will be in the "delayed" state during this time.

ttr -- time to run -- is an integer number of seconds to allow a worker to run this job. This time is counted from the moment a worker reserves this job. If the worker does not delete, release, or bury the job within ttr seconds, the job will time out and the server will release the job. The minimum ttr is 1. If the client sends 0, the server will silently increase the ttr to 1.

Upon success put returns a BeanIntResponse containing the id of the inserted job.

Note that the server may run out of memory trying to grow the priority queue data structure. In that case the job should still be created, and a success with the new id is returned. The status will be buried, not inserted as normal.

proc watch(socket: Socket; tube: string): BeanIntResponse {.
    raises: [OSError, TimeoutError, OverflowError, ValueError],
    tags: [WriteIOEffect, ReadIOEffect, TimeEffect].}

The watch command adds the named tube to the watch list for the current connection. A reserve command will take a job from any of the tubes in the watch list. For each new connection, the watch list initially consists of one tube, named "default".

tube is a name at most 200 bytes. It specifies a tube to add to the watch list. If the tube doesn't exist, it will be created.

watch returns the number of tubes currently in the watch list.

proc ignore(socket: Socket; tube: string): BeanIntResponse {.
    raises: [OSError, TimeoutError, OverflowError, ValueError],
    tags: [WriteIOEffect, ReadIOEffect, TimeEffect].}
Removes the named tube from the watch list for the current connection.
proc listTubesWatched(socket: Socket): seq[string] {.
    raises: [OSError, TimeoutError, OverflowError, ValueError],
    tags: [WriteIOEffect, ReadIOEffect, TimeEffect].}
Returns a sequence of named tubes that is currently watched by the connection. A reserve command will take a job from any of these tubes.
proc reserve(socket: Socket; timeout = - 1): BeanJob {.
    raises: [OSError, ValueError, TimeoutError, OverflowError],
    tags: [WriteIOEffect, ReadIOEffect, TimeEffect].}

Reserve and return a job. If no job is available to be reserved but a timeout > 0 is specified, reserve will block and wait the specified amount of seconds or until a job becomes available.

The default timeout value of -1 makes reserve block and wait for new jobs indefinitely.

proc touch(socket: Socket; id: int): BeanResponse {.
    raises: [OSError, ValueError, TimeoutError],
    tags: [WriteIOEffect, ReadIOEffect, TimeEffect].}

touch allows a worker to request more time to work on a job. This is useful for jobs that potentially take a long time, but you still want the benefits of a TTR pulling a job away from an unresponsive worker. A worker may periodically tell the server that it's still alive and processing a job. The command postpones the auto release of a reserved job until TTR seconds from when the command is issued.

id is the ID of a job reserved by the current connection.

proc delete(socket: Socket; id: int): BeanResponse {.
    raises: [OSError, ValueError, TimeoutError],
    tags: [WriteIOEffect, ReadIOEffect, TimeEffect].}

Removes a job from the server entirely. It is normally used by the client when the job has successfully run to completion. A client can delete jobs that it has reserved, ready jobs, delayed jobs, and jobs that are buried.

id is the job id to delete.

proc release(socket: Socket; id: int; pri = 100; delay = 0): BeanResponse {.
    raises: [OSError, ValueError, TimeoutError],
    tags: [WriteIOEffect, ReadIOEffect, TimeEffect].}

Puts a reserved job back into the ready queue (and marks its state as "ready") to be run by any client.

id: The id of the job to release.

pri: A new priority to assign to the job.

delay: An integer number of seconds to wait before putting the job in the ready queue. The job will be in the "delayed" state during this time.

proc bury(socket: Socket; id: int; pri = 100): BeanResponse {.
    raises: [OSError, ValueError, TimeoutError],
    tags: [WriteIOEffect, ReadIOEffect, TimeEffect].}

Puts a job into the "buried" state. Buried jobs are put into a FIFO linked list and will not be touched by the server again until a client kicks them with the kick command.

id is the job id to bury.

pri is a new priority to assign to the job.

proc kick(socket: Socket; bound: int): BeanIntResponse {.
    raises: [OSError, ValueError, TimeoutError, OverflowError],
    tags: [WriteIOEffect, ReadIOEffect, TimeEffect].}

The kick command applies only to the currently used tube. It moves jobs into the ready queue. If there are any buried jobs, it will only kick buried jobs. Otherwise it will kick delayed jobs.

bound is an integer upper bound on the number of jobs to kick. The server will kick no more than bound jobs.

The returned respons contain an integer indicating the number of jobs actually kicked.

proc kickJob(socket: Socket; id: int): BeanResponse {.
    raises: [OSError, ValueError, TimeoutError],
    tags: [WriteIOEffect, ReadIOEffect, TimeEffect].}
A variant of kick that operates with a single job identified by its job id. If the given job id exists and is in a buried or delayed state, it will be moved to the ready queue of the the same tube where it currently belongs.
proc pauseTube(socket: Socket; tube: string; delay: int): BeanResponse {.
    raises: [OSError, ValueError, TimeoutError],
    tags: [WriteIOEffect, ReadIOEffect, TimeEffect].}

This command can delay any new job being reserved for a given time.

tube is the tube to pause.

delay is an integer number of seconds to wait before reserving any more jobs from the queue.

proc peek(socket: Socket; id: int): BeanJob {.
    raises: [OSError, ValueError, TimeoutError, OverflowError],
    tags: [WriteIOEffect, ReadIOEffect, TimeEffect].}
Get a job by its id. The job is not reserved by this operation!
proc peekReady(socket: Socket): BeanJob {.
    raises: [OSError, TimeoutError, OverflowError, ValueError],
    tags: [WriteIOEffect, ReadIOEffect, TimeEffect].}
Get the highest priority job from the ready queue. The job is not reserved by this operation!
proc peekDelayed(socket: Socket): BeanJob {.
    raises: [OSError, TimeoutError, OverflowError, ValueError],
    tags: [WriteIOEffect, ReadIOEffect, TimeEffect].}
Returns the delayed job with the shortest delay left. The job is not reserved by this operation!
proc peekBuried(socket: Socket): BeanJob {.
    raises: [OSError, TimeoutError, OverflowError, ValueError],
    tags: [WriteIOEffect, ReadIOEffect, TimeEffect].}
Returns the next job in the list of buried jobs. The job is not reserved by this operation!
proc stats(socket: Socket): seq[string] {.
    raises: [OSError, TimeoutError, OverflowError, ValueError],
    tags: [WriteIOEffect, ReadIOEffect, TimeEffect].}

Gives statistical information about the system as a whole.

A sequence of strings are returned with strings of "key: value". This may change to something more strongly types in the future.

proc statsTube(socket: Socket; tube: string): seq[string] {.
    raises: [OSError, ValueError, TimeoutError, OverflowError],
    tags: [WriteIOEffect, ReadIOEffect, TimeEffect].}

Gives statistical information about the specified tube if it exists.

A sequence of strings are returned with strings of "key: value". This may change to something more strongly types in the future.

proc statsJob(socket: Socket; id: int): seq[string] {.
    raises: [OSError, ValueError, TimeoutError, OverflowError],
    tags: [WriteIOEffect, ReadIOEffect, TimeEffect].}

Gives statistical information about the specified job if it exists.

A sequence of strings are returned with strings of "key: value". This may change to something more strongly types in the future.

proc quit(socket: Socket) {.raises: [OSError], tags: [WriteIOEffect].}
Closes the connection.