Source code for cup.jenkinslib.internal.jobs

#!/usr/bin/env python
# -*- coding: utf-8 -*
# Copyright: [CUP] - See LICENSE for details.
# Authors: liushuxian(liushuxian)
"""
This module provides Jobs object.
Jobs is a container of Job.
"""

from cup.jenkinslib.internal import exception


class Jobs(object):
    """A container of Job objects."""
    def __init__(self, jenkins):
        """initialize Jobs object."""
        self.jenkins = jenkins
        jenkins.poll()

    def __setitem__(self, job_name, config):
        """create new job."""
        return self.create(job_name, config)

    def __getitem__(self, job_name):
        """get job by name."""
        for row in self.jenkins._data.get('jobs', []):
            if row['name'] == job_name:
                return self.jenkins.Job(row['url'], row['name'], self.jenkins)
        raise exception.UnknownJob(job_name)

    def __delitem__(self, job_name):
        """delete job by name."""
        retry = 3
        while job_name in self and retry:
            try:
                delete_job_url = self[job_name].get_delete_url()
                self.jenkins.requester.post_and_confirm_status(
                    delete_job_url,
                    data='some random bytes...')
                self.jenkins.poll()
            except exception.JenkinsAPIError:
                retry -= 1

    def __contains__(self, job_name):
        """job exists or not."""
        return job_name in self.keys()

    def __len__(self):
        """job num."""
        return len(self.keys())

    def __iter__(self):
        """iterator for job names."""
        return (row['name'] for row in self.jenkins._data.get('jobs', []))

    def iterkeys(self):
        """get all job name."""
        for row in self.jenkins._data.get('jobs', []):
            yield row['name']

    def keys(self):
        """get all job name."""
        return list(self.iterkeys())

    def create(self, job_name, config):
        """create new job.

        Args:
            job_name: name of new job.
            config: configure for new job, xml text.

        Returns:
            new job object.
        """
        if job_name in self:
            return self[job_name]

        params = {'name': job_name}
        self.jenkins.requester.post_xml_and_confirm_status(
            self.jenkins.get_create_url(),
            data=config,
            params=params)

        self.jenkins.poll()
        if job_name not in self:
            raise exception.JenkinsAPIError('Cannot create job %s' % job_name)

        return self[job_name]

    def rename(self, job_name, new_job_name):
        """rename a job.

        Args:
            job_name: name of a existing job.
            new_job_name: new job name.

        Returns:
            new job object.
        """
        params = {'newName': new_job_name}
        rename_job_url = self[job_name].get_rename_url()
        self.jenkins.requester.post_and_confirm_status(
            rename_job_url,
            data="",
            params=params)
        self.jenkins.poll()
        return self[new_job_name]