Source code for errata_tool.erratum

from __future__ import print_function
import datetime
import os
import re
import requests_kerberos
import six
import textwrap
import time

from errata_tool import ErrataException, ErrataConnector, security, User


[docs]class Erratum(ErrataConnector):
[docs] def fmt(self, s): # The textwrap library doesn't parse newlines, so you'll want to # split on them first, then format each line, then join it all back # up. lines = s.split('\n') page = [] for l in lines: b = textwrap.TextWrapper(width=75, replace_whitespace=True, break_long_words=False, break_on_hyphens=False) page.append(b.fill(l)) return '\n'.join(page)
def _do_init(self): self.errata_id = 0 self._original_bugs = [] self._cve_bugs = [] self._original_state = 'NEW_FILES' self._original_json = {} self._product = None # Set when you call new self._release = None # Set when you call new self._new = False # set when you call create self._update = False # Set to true if you update any fields self._format = True # Format fields on update (new adv.) self._buildschanged = False # Set to true if you changed builds # These should be updated with the 'update()' method, and are provided # primarily for debugging/printing by user apps self.errata_type = None self.text_only = False self.text_only_cpe = None self.publish_date_override = None self.publish_date = None self.creation_date = None self.ship_date = None # Set if SHIPPED_LIVE self.age = 0 # Erratum age in days self.package_owner_email = None self.manager_email = None self.manager_id = 0 self.product_id = 0 self.release_id = 0 self.qe_email = '' self.qe_group = '' self.synopsis = None self.topic = None self.description = None self.solution = None self.security_impact = None self.cve_names = None self.errata_bugs = [] self.errata_builds = {} self.current_flags = [] self.missing_prod_listings = [] self.batch_id = None
[docs] def update(self, **kwargs): if 'errata_type' in kwargs: self.errata_type = kwargs['errata_type'] self._update = True if 'security_impact' in kwargs: self.security_impact = kwargs['security_impact'] self._update = True if 'text_only' in kwargs: self.text_only = kwargs['text_only'] self._update = True if 'text_only_cpe' in kwargs: self.text_only_cpe = kwargs['text_only_cpe'] self._update = True if 'date' in kwargs: try: datetime.datetime.strptime(kwargs['date'], '%Y-%b-%d') except ValueError: raise ValueError( 'Date must be of the form: YYYY-MON-DD; 2015-Mar-11') self.publish_date_override = kwargs['date'] self._update = True if 'owner_email' in kwargs: self.package_owner_email = kwargs['owner_email'] self._update = True if 'manager_email' in kwargs: self.manager_email = kwargs['manager_email'] self._update = True if 'manager_id' in kwargs: self.manager_id = kwargs['manager_id'] self._update = True if 'qe_email' in kwargs: self.qe_email = kwargs['qe_email'] self._update = True if 'qe_group' in kwargs: self.qe_group = kwargs['qe_group'] self._update = True if 'synopsis' in kwargs: self.synopsis = kwargs['synopsis'] self._update = True if 'cve_names' in kwargs: self.cve_names = kwargs['cve_names'] self._update = True if 'topic' in kwargs: self.topic = self.fmt(kwargs['topic']) self._update = True if 'description' in kwargs: self.description = self.fmt(kwargs['description']) self._update = True if 'solution' in kwargs: self.solution = self.fmt(kwargs['solution']) self._update = True
def __init__(self, **kwargs): self.ssl_verify = security.security_settings.ssl_verify() # Blank erratum e.g. if create is required self._do_init() if 'errata_id' in kwargs: self._fetch(kwargs['errata_id']) return if 'bug_id' in kwargs: self._fetch_by_bug(kwargs['bug_id']) return if 'product' not in kwargs: raise ErrataException('Creating errata requires a product') if 'release' not in kwargs: raise ErrataException('Creating errata requires a release') if 'format' in kwargs: self._format = kwargs['format'] self._new = True self.errata_name = '(unassigned)' self.errata_state = 'NEW_FILES' self._product = kwargs['product'] self._release = kwargs['release'] self.update(**kwargs) if 'solution' not in kwargs: self.solution = self.fmt("Before applying this update, \ make sure all previously released errata relevant to your system \ have been applied.\n\ \n\ For details on how to apply this update, refer to:\n\ \n\ https://access.redhat.com/articles/11258") # errata tool defaults if 'errata_type' in kwargs: self.errata_type = kwargs['errata_type'] else: self.errata_type = 'RHBA' # Pull down the state of the erratum and store it. def _fetch(self, errata_id): self._new = False self._update = False self._buildschanged = False self.errata_builds = {} self.current_flags = [] try: # TODO: remove call to /advisory/X.json once new API # supports all the information endpoint_list = [ '/advisory/' + str(errata_id) + '.json', '/api/v1/erratum/' + str(errata_id), ] # Want to ditch advisory_old eventually advisory_old = None advisory = None erratum = None for endpoint in endpoint_list: r = self._get(endpoint) if r is None: continue if advisory is None and 'erratum' in endpoint: advisory = r continue # Fallthrough if advisory_old is None: advisory_old = r if advisory is None: print('do not have requested data bailing') return None # Short circuit to get the advisory for key in advisory['errata']: erratum = advisory['errata'][key] self.errata_type = key.upper() break self.errata_id = erratum['id'] # NEW_FILES QE etc. self.errata_state = erratum['status'] self._original_state = self.errata_state self._original_json = erratum # Check if the erratum is under embargo self.embargoed = False self.release_date = erratum['release_date'] if self.release_date is not None: cur = datetime.datetime.utcnow() cur = str(cur).split()[0] if self.release_date > cur: self.embargoed = True # Target Ship date d = erratum['publish_date_override'] if d is not None: pd = time.strptime(str(d), '%Y-%m-%dT%H:%M:%SZ') self.publish_date_override = time.strftime('%Y-%b-%d', pd) # Target Ship date (immutable; e.g. from batch) d = erratum['publish_date'] if d is not None: pd = time.strptime(str(d), '%Y-%m-%dT%H:%M:%SZ') self.publish_date = time.strftime('%Y-%b-%d', pd) # Actual ship date (if in SHIPPED_LIVE) if self.errata_state in ('SHIPPED_LIVE'): d = erratum['actual_ship_date'] # Could be None. e.g. advisory 43686 if d: d = time.strptime(str(d), '%Y-%m-%dT%H:%M:%SZ') self.ship_date = time.strftime('%Y-%b-%d', d) # File date d = erratum['created_at'] d = time.strptime(str(d), '%Y-%m-%dT%H:%M:%SZ') self.creation_date = time.strftime('%Y-%b-%d', d) d = time.strftime('%Y-%b-%d', time.gmtime()) if self.ship_date is not None: d = self.ship_date filed = datetime.datetime.strptime(self.creation_date, '%Y-%b-%d') ship = datetime.datetime.strptime(d, '%Y-%b-%d') age = ship - filed self.age = age.days # Baseline flags. if self.errata_state in ('QE'): if 'sign_requested' in erratum and \ erratum['sign_requested'] == 0: self.addFlags('request_sigs') if 'rhnqa' in erratum and erratum['rhnqa'] == 0: self.addFlags('needs_distqa') if 'doc_complete' in erratum and erratum['doc_complete'] == 0: self.addFlags('needs_docs') if self.errata_state == 'NEW_FILES': self.addFlags('needs_devel') # Note: new errata return values will have other bits. self.errata_name = erratum['fulladvisory'] # Grab immutable fields self._product = advisory_old['product']['short_name'] self._release = advisory_old['release']['name'] # A maybe-empty list, containing eg. "rpm" or "docker" self.content_types = erratum['content_types'] # store product and release IDs self.product_id = advisory_old['product']['id'] self.release_id = advisory_old['release']['id'] self.package_owner_email = advisory_old['people']['package_owner'] self.reporter = advisory_old['people']['reporter'] self.qe_email = advisory_old['people']['assigned_to'] self.qe_group = advisory_old['people']['qe_group'] # XXX Errata tool doesn't report manager? # https://bugzilla.redhat.com/show_bug.cgi?id=1664884 # self.manager_email = ??? self.manager_id = erratum.get('manager_id') # Grab mutable errata content self.text_only = erratum['text_only'] self.synopsis = erratum['synopsis'] content = advisory['content']['content'] self.text_only_cpe = content['text_only_cpe'] self.topic = content['topic'] self.description = content['description'] self.solution = content['solution'] self.errata_bugs = [int(b['bug']['id']) for b in advisory['bugs']['bugs']] self.cve_names = content['cve'] if self.cve_names == '': self.cve_names = None self._original_bugs = list(self.errata_bugs) self._cache_bug_info(self._original_bugs) # Try to check to see if we need devel assistance, qe assistance or # rel prep assistance if self.errata_state == 'QE': self._check_tps() self._check_bugs() self._check_need_rel_prep() # Check for security review if 'rhsa' in advisory['errata']: sa = advisory['errata']['rhsa']['security_approved'] self.security_impact = advisory['errata']['rhsa']['security_impact'] # NOQA if sa is None: self.addFlags('request_security') elif sa is False: self.addFlags('needs_security') check_signatures = self.errata_state != 'NEW_FILES' self._get_build_list(check_signatures) self.batch_id = erratum.get('batch_id') return except RuntimeError: # Requests seems to loop infinitely if this happens... raise ErrataException('Pigeon crap. Did it forget to run kinit?') except IndexError: # errata_id not found raise ErrataException('Errata ID field not found in response') except Exception: # Todo: better handling raise def _check_signature_for_build(self, build): signed = False url = os.path.join('/api/v1/build/', build) nvr_json = self._get(url) if u'rpms_signed' in nvr_json: if nvr_json[u'rpms_signed']: signed = True return signed def _cache_bug_info(self, bug_id_list): # Omitted: RHOS shale's use of bz_cache here. pass
[docs] def metadataCdnRepos(self, enable=[], disable=[]): """Get or set the CDN repos for this advisory. Note: This method applies only for advisories containing Docker images. When called with no arguments, this method returns all available CDN repos for advisory metadata. Otherwise you may enable or disable repos here. :param enable: (optional) A list of CDN repos to enable. Example: ["rhel-7-server-rhceph-3-mon-rpms__x86_64"] :param disable: (optional) A list of CDN repos to disable. :returns: a list of dicts about each available repo, and whether they are enabled or disabled. """ return self._cdn_repos('metadata_cdn_repos', enable, disable)
[docs] def textOnlyRepos(self, enable=[], disable=[]): """Get or set the text-only repos for this advisory. Note: This method applies only for text-only advisories. When called with no arguments, this method returns all available CDN repos for the advisory text. Otherwise you may enable or disable repos here. :param enable: (optional) A list of CDN repos to enable. Example: ["rhel-7-server-rhceph-3-mon-rpms__x86_64"] :param disable: (optional) A list of CDN repos to disable. :returns: a list of dicts about each available repo, and whether they are enabled or disabled. """ return self._cdn_repos('text_only_repos', enable, disable)
def _cdn_repos(self, endpoint, enable=[], disable=[]): """Get or set the repos for this advisory. Use this for setting repos on text-only or docker advisories. :param endpoint: The erratum API endpoint to request. Example: "metadata_cdn_repos" or "text_only_repos". :param enable: (optional) A list of CDN repos to enable. Example: ["rhel-7-server-rhceph-3-mon-rpms__x86_64"] :param disable: (optional) A list of CDN repos to disable. :returns: a list of dicts about each available repo, and whether they are enabled or disabled. """ if endpoint not in ('metadata_cdn_repos', 'text_only_repos'): raise ValueError('unsupported endpoint %s', endpoint) url = '/api/v1/erratum/%d/%s' % (self.errata_id, endpoint) if not enable and not disable: return self._get(url) payload = [{'enabled': True, 'repo': repo} for repo in enable] payload += [{'enabled': False, 'repo': repo} for repo in disable] result = self._put(url, json=payload) # XXX we should fix error handling and return values in _put() to work # like _get() currently does. result.raise_for_status() return result.json() def _check_tps(self): # Check for TPS failure (QE state only) url = '/advisory/%i/tps_jobs.json' % self.errata_id r = self._get(url) distqa_tps = 0 distqa_passing = 0 for tps in r: if tps['rhnqa'] is True: distqa_tps = distqa_tps + 1 if tps['state'] == 'BAD' or \ 'failed to generate' in tps['state']: self.addFlags('tps_errors') continue if tps['state'] in ('BUSY', 'NOT_STARTED'): self.addFlags('tps_wait') continue if tps['rhnqa'] is True: distqa_passing = distqa_passing + 1 # Assume testing is done... ;) if distqa_tps > 0 and distqa_passing != distqa_tps: self.addFlags('needs_distqa') self.need_rel_prep = False else: self.need_rel_prep = True def _check_bugs(self): pass def _check_need_rel_prep(self): # Omitted: RHOS shale's "need_rel_prep" here, uses bz_cache. pass
[docs] def externalTests(self, test_type=None): """Get active external test results for this advisory. :param test_type: str, like "rpmdiff" or "covscan" :returns: a possibly-empty list of dicts, one per result. """ tmpl = '/api/v1/external_tests/?filter[active]=true' tmpl += '&filter[errata_id]={errata_id}' if test_type: tmpl += '&filter[test_type]={test_type}' url = tmpl.format(errata_id=self.errata_id, test_type=test_type) data = self.get_paginated_data(url) return data
def _get_build_list(self, check_signatures=False): # Grab build list; store on a per-key basis # REFERENCE # Item 5.2.10.3. GET /advisory/{id}/builds.json # Then try to check to see if they are signed or not # Item 5.2.2.1. GET /api/v1/build/{id_or_nvr} url = "/advisory/%i/builds.json" % self.errata_id product_versions = self._get(url) have_all_sigs = True for product_version in product_versions: builds = [] for pv_builds in product_versions[product_version]: for nvr, mappings in six.iteritems(pv_builds): builds.append(nvr) if not mappings: self.missing_prod_listings.append(nvr) if have_all_sigs and check_signatures: if not self._check_signature_for_build(nvr): self.addFlags('needs_sigs') have_all_sigs = False self.errata_builds[product_version] = builds if have_all_sigs: self.removeFlags(['request_sigs', 'needs_sigs']) def _fetch_by_bug(self, bug_id): # print("fetch_by_bug") try: url = "/bugs/%i/advisories.json" % bug_id rj = self._get(url) stored = False for e in rj: if not stored: stored = True self._fetch(e['id']) else: print('Warning: Ignoring additional erratum ' + str(e['id']) + ' for bug ', str(bug_id)) except RuntimeError: # Requests seems to loop infinitely if this happens... raise ErrataException('Pigeon crap. Did it forget to run kinit?') except IndexError: # errata_id not found raise ErrataException('Errata ID field not found in response') except LookupError: # Errata not found pass except Exception: # Todo: better handling raise
[docs] def refresh(self): if self.errata_id != 0: self._fetch(self.errata_id)
[docs] def reloadBuilds(self, no_rpm_listing_only=False, no_current_files_only=False): val = { 'no_rpm_listing_only': int(no_rpm_listing_only), 'no_current_files_only': int(no_current_files_only), } url = '/api/v1/erratum/%d/reload_builds' % self.errata_id r = self._post(url, data=val) self._processResponse(r) return r.json()
[docs] def setState(self, state): if self._new: raise ErrataException('Cannot simultaneously create and change ' + 'an erratum\'s state') if self.errata_id == 0: raise ErrataException('Cannot change state for uninitialized ' + 'erratum') if self.errata_state.upper() == 'NEW_FILES': if state.upper() == 'QE': self.errata_state = 'QE' elif self.errata_state.upper() == 'QE': if state.upper() == 'NEW_FILES': self.errata_state = 'NEW_FILES' if state.upper() == 'REL_PREP': self.errata_state = 'REL_PREP' elif self.errata_state.upper() == 'REL_PREP': if state.upper() == 'NEW_FILES': self.errata_state = 'NEW_FILES' if state.upper() == 'QE': self.errata_state = 'QE' else: raise ErrataException('Cannot change state from ' + self.errata_state.upper() + " to " + state.upper())
def _addBug(self, b): if not isinstance(b, int): b = int(b) if self.errata_bugs is None: self.errata_bugs = [] self.errata_bugs.append(b) return if b not in self.errata_bugs: self.errata_bugs.append(b)
[docs] def addBugs(self, buglist): if isinstance(buglist, int): self._addBug(buglist) return for b in buglist: self._addBug(b)
def _removeBug(self, b): if not isinstance(b, int): b = int(b) if b in self.errata_bugs: self.errata_bugs.remove(b)
[docs] def removeBugs(self, buglist): if isinstance(buglist, int): self._removeBug(buglist) return for b in buglist: self._removeBug(b)
# Omitted: RHOS shale's syncBugs()
[docs] def syncBugs(self): raise NotImplementedError('RHOS-only method')
# Omitted: RHOS shale's findMissingBuilds()
[docs] def findMissingBuilds(self): raise NotImplementedError('RHOS-only method')
[docs] def changeDocsReviewer(self, login_name): val = {'login_name': login_name} url = '/api/v1/erratum/%d/change_docs_reviewer' % self.errata_id r = self._post(url, data=val) self._processResponse(r)
[docs] def addCC(self, email): """Add someone to the CC list for this advisory. """ # rhbz#1572000 will add an official API for this. val = {'id': self.errata_id, 'email': email} url = '/carbon_copies/add_to_cc_list' r = self._post(url, data=val) self._processResponse(r)
# # Flag list could be replaced with a set at some # point. # # Some flags are tracked and managed here in # errata-tool, but users can add their own as well. #
[docs] def addFlags(self, flags): if not isinstance(flags, list): flags = [flags] # Two loops intentionally. First one is for # input validation. for f in flags: if not isinstance(f, str): raise ValueError('flag ' + str(f) + ' is not a string') for f in flags: if f not in self.current_flags: self.current_flags.append(f)
[docs] def removeFlags(self, flags): if not isinstance(flags, list): flags = [flags] # Two loops intentionally. First one is for # input validation. for f in flags: if not isinstance(f, str): raise ValueError('flag ' + str(f) + ' is not a string') for f in flags: if f in self.current_flags: self.current_flags.remove(f)
# Adding and removing builds can't be done atomically. Wondering whether
[docs] def addBuildsDirect(self, buildlist, release, **kwargs): if 'file_types' not in kwargs: file_types = None else: file_types = kwargs['file_types'] blist = [] if isinstance(buildlist, six.string_types): blist.append(buildlist) else: blist = buildlist # Adding builds # List of dicts. pdata = [] for b in blist: # Avoid double-add if release in self.errata_builds and \ b in self.errata_builds[release]: continue val = {} if file_types is not None and b in file_types: val['file_types'] = file_types[b] val['build'] = b val['product_version'] = release pdata.append(val) url = "/api/v1/erratum/%i/add_builds" % self.errata_id r = self._post(url, json=pdata) self._processResponse(r) self._buildschanged = True return
[docs] def addBuilds(self, buildlist, **kwargs): """Add Build(s) to erratum""" if self._new: raise ErrataException('Cannot add builds to unfiled erratum') release = None if 'release' in kwargs: release = kwargs['release'] del kwargs['release'] if release is None and len(self.errata_builds.keys()) == 1: release = list(self.errata_builds.keys())[0] if release is None: raise ErrataException('Need to specify a release') return self.addBuildsDirect(buildlist, release, **kwargs)
[docs] def setFileInfo(self, file_info): # XXX API broken?? if not isinstance(file_info, dict): raise ValueError('file_info is not a dict') if len(file_info) < 1: return # Get: url = '/api/v1/erratum/%i/filemeta' % self.errata_id r = self._get(url) info = [] files = [k for k in file_info] for f in r: # print(f['file']['path'] + f['file']['id']) fn = os.path.basename(f['file']['path']) if fn in files: info.append({'file': f['file']['id'], 'title': file_info[fn]['title']}) # print(info) # Set: # url += '?put_rank=true' r = self._put(url, data=info) self._processResponse(r)
[docs] def removeBuilds(self, buildlist): """Remove build(s) from advisory""" if not isinstance(buildlist, (str, list)): raise IndexError # Removing builds # REFERENCE if isinstance(buildlist, six.string_types): builds = [] if len(buildlist.strip()) == 0: raise IndexError builds.append(buildlist.strip()) else: builds = buildlist if len(builds) == 0: raise IndexError for b in builds: val = {} val['nvr'] = b url = "/api/v1/erratum/%i/remove_build" % self.errata_id r = self._post(url, data=val) self._processResponse(r) self._buildschanged = True
def _write(self): pdata = {} # See below for APIs used when talking to the errata tool. if self._new: if self.package_owner_email is None: raise ErrataException("Can't create erratum without " + "package owner email") if self.manager_email is None: if self.manager_id: manager = User(self.manager_id) self.manager_email = manager.email_address else: raise ErrataException("Can't create erratum without " + "manager email or manager id") if self._product is None: raise ErrataException("Can't create erratum with no " + "product specified") if self._release is None: raise ErrataException("Can't create erratum with no " + "release specified") if self.errata_type is None: self.errata_type = 'RHBA' pdata['product'] = self._product pdata['release'] = self._release if self.package_owner_email is not None: pdata['advisory[package_owner_email]'] = self.package_owner_email if self.manager_email is not None: pdata['advisory[manager_email]'] = self.manager_email if self.qe_email is not None and self.qe_email != '': pdata['advisory[assigned_to_email]'] = self.qe_email if self.qe_group is not None and self.qe_group != '': pdata['advisory[quality_responsibility_name]'] = self.qe_group if self.synopsis is None: raise ErrataException("Can't write erratum without synopsis") if self.topic is None: raise ErrataException("Can't write erratum without topic") if self.description is None: raise ErrataException("Can't write erratum without description") if self.solution is None: raise ErrataException("Can't write erratum without a solution") if self.errata_bugs is None: raise ErrataException("Can't write erratum without a list of " + "bugs") # Default from errata tool pdata['advisory[errata_type]'] = self.errata_type # POST/PUT a 1 or 0 value for this text_only boolean pdata['advisory[text_only]'] = int(self.text_only) if self.text_only_cpe: pdata['advisory[text_only_cpe]'] = self.text_only_cpe if self.publish_date_override: pdata['advisory[publish_date_override]'] = \ self.publish_date_override # ET automagically handles the severity for the synopsis in RHSA's # but will still see it as a docs change if we write the same one # back again, so remove it. if self.errata_type == 'RHSA': severity = r'^(Low|Moderate|Important|Critical): ' self.synopsis = re.sub(severity, "", self.synopsis) pdata['advisory[cve]'] = self.cve_names val = 'None' if self.security_impact is not None: val = self.security_impact pdata['advisory[security_impact]'] = val pdata['advisory[synopsis]'] = self.synopsis pdata['advisory[topic]'] = self.topic pdata['advisory[description]'] = self.description pdata['advisory[solution]'] = self.solution # XXX Delete all bugs is a special case last_bug = None if len(self.errata_bugs) == 0 and len(self._original_bugs) > 0: last_bug = self._original_bugs[0] self.errata_bugs = [last_bug] # Add back any Vulnerability bugs allbugs = list(set(self.errata_bugs) | set(self._cve_bugs)) idsfixed = ' '.join(str(i) for i in allbugs) pdata['advisory[idsfixed]'] = idsfixed # Sync newly added bug states newbugs = list(set(allbugs) - set(self._original_bugs)) if len(newbugs): # url = '/api/v1/bug/refresh' # print(allbugs) # r = self._post(url, data=newbugs) # self._processResponse(r) # ^ XXX broken # # XXX Sync bug states by force using UI # Note: UI limits syncs to 100 bugs per run, so split # up into chunks syncs = [newbugs[x:x + 100] for x in range(0, len(newbugs), 100)] bug_list = {} for s in syncs: bug_list['issue_list'] = ' '.join(str(i) for i in s) url = "/bugs/sync_bug_list" r = self._post(url, data=bug_list) # XXX should we process return code? # Push it if self._new: # REFERENCE # New is 'POST' url = "/api/v1/erratum" r = self._post(url, data=pdata) self._processResponse(r) rj = r.json() json_errata_type = self.errata_type.lower() self.errata_id = rj['errata'][json_errata_type]['errata_id'] # XXX return JSON returns full advisory name but not # typical advisory name - e.g. RHSA-2015:19999-01, but not # RHSA-2015:19999, but it's close enough self.errata_name = rj['errata'][json_errata_type]['fulladvisory'] else: # REFERENCE # Update is 'PUT' url = "/api/v1/erratum/%i" % self.errata_id r = self._put(url, data=pdata) self._processResponse(r) # XXX WOW VERY HACK # If deleting last bug... if last_bug is not None: # This doesn't work to remove the last bug, nor does setting # idsfixed to empty-string # url = "/api/v1/erratum/%i/remove_bug" % self.errata_id # pdata = {'bug': str(last_bug)} # Solution: Use hacks to pretend we're using the remove-bugs # web UI :( url = '/bugs/remove_bugs_from_errata/%i' % self.errata_id pdata = {} pdata['bug[' + str(last_bug) + ']'] = 1 # Handle weird interaction we get in this particular case try: r = self._post(url, data=pdata) except requests_kerberos.exceptions.MutualAuthenticationError: pass self._processResponse(r) def _putStatus(self): # REFERENCE # State change is 'POST' pdata = {} pdata['new_state'] = self.errata_state url = "/api/v1/erratum/%i" % self.errata_id url += "/change_state" r = self._post(url, data=pdata) self._processResponse(r)
[docs] def commit(self): ret = False # Commit changes if self._new: self._write() self.refresh() # self.syncBugs() # RHOS shale only return # XXX Not atomic, but we should refresh on commit if self._buildschanged: ret = True try: # Special case: # If new state is 'NEW_FILES', set it before anything else if (self._original_state != self.errata_state and self.errata_state.upper() == 'NEW_FILES'): self._putStatus() ret = True # Update buglist if it changed # Errata tool is very slow - don't PUT if it hasn't changed allbugs = list(set(self.errata_bugs) | set(self._cve_bugs)) if sorted(self._original_bugs) != sorted(allbugs) \ or self._update: self._write() # self.syncBugs() # RHOS shale only ret = True # Perhaps someone did addbugs + setState('QE') if (self._original_state != self.errata_state and self.errata_state.upper() != 'NEW_FILES'): self._putStatus() ret = True except ErrataException: raise if ret: self.refresh() return ret
[docs] def push(self, target='stage'): """Push an advisory to "stage", "live", or both. :param target: A string "stage" or "live". Defaults to "stage". You can also pass a list here, ['stage', 'live'] to do both in one operation. :returns: a list describing the Errata Tool's newly triggered push tasks. Each push task includes an "id". You can query the status of this push ID at /api/v1/erratum/{id}/push/{push_id} . """ # Accept 'stage', 'live', or a set of specific options if self.errata_id == 0: return False # Basic mode: 'stage' or 'live' url = '/api/v1/erratum/' + str(self.errata_id) + '/push' if isinstance(target, str): if target != 'stage' and target != 'live': raise ValueError('Wrong value for target: expected ' + '\'stage\', \'live\', or a list') r = self._post(url + '?defaults=' + str(target)) self._processResponse(r) return r.json() # Advanced mode: see ET documentation. if not isinstance(target, list): raise ValueError('Wrong value for target: expected ' + '\'stage\', \'live\', or a list') r = self._post(url, data=target) self._processResponse(r) return r.json()
[docs] def dump(self): print(self) print("Package Owner Email: " + self.package_owner_email) print("Manager Email: " + self.manager_email) print("QE: " + self.qe_email + " " + self.qe_group) print("Type: " + self.errata_type) if self.creation_date is not None: print("Created: " + self.creation_date) if self.errata_state == 'SHIPPED_LIVE': print("Shipped: " + self.ship_date) print("Age: " + str(self.age) + " days") if len(self.current_flags) > 0: print("Flags: " + ' '.join(self.current_flags)) print("Synopsis: " + self.synopsis) if self.publish_date_override is not None: print('') print("Ship Target: {0}".format(self.publish_date_override)) if self.publish_date is not None: print('') print("Ship Target: {0}".format(self.publish_date)) if self.batch_id is not None: print('') print("Batch: {0}".format(self.batch_id)) print("Ship Target: {0}".format(self.publish_date)) print('') print("Topic") print("=====") print(self.topic) print('') print("Description") print("===========") print(self.description) print('') print("Solution") print("========") print(self.solution)
[docs] def url(self): return super(Erratum, self).canonical_url("/advisory/" + str(self.errata_id))
[docs] def get_erratum_data(self): """Return the server's JSON data for this advisory. This returns the JSON response from the Errata Tool's REST API /advisory/<errata_id>.json and /api/v1/erratum/<errata_id> endpoints. Use this when debugging interactions with the Errata Tool or when passing this data on to other non-Python tools. :returns: a dict that is simply the parsed JSON from the server. """ return dict(self._original_json) # shallow copy
def __lt__(self, other): return self.errata_id < other.errata_id def __gt__(self, other): return self.errata_id > other.errata_id def __eq__(self, other): return self.errata_id == other.errata_id def __le__(self, other): return self.errata_id <= other.errata_id def __ge__(self, other): return self.errata_id >= other.errata_id def __ne__(self, other): return self.errata_id != other.errata_id def __str__(self): s = "\n builds: \n" for k in self.errata_builds: s = s + " " + k + "\n" for b in sorted(self.errata_builds[k], key=lambda x: x.lower()): s = s + " " + b + "\n" if len(self.current_flags) > 0: s = "\n Flags: " + ' '.join(self.current_flags) + s if len(self._cve_bugs) > 0: s = "\n Impact: " + str(self.security_impact) + s s = "\n CVE bugs: " + str(self._cve_bugs) + s if self.cve_names is not None: s = "\n CVEs: " + str(self.cve_names) + s pdate = self.publish_date_override if pdate is None and self.publish_date is not None: pdate = self.publish_date if self.batch_id is not None and self.publish_date is not None: pdate = self.publish_date return self.errata_name + ": " + self.synopsis + \ "\n package owner: " + self.package_owner_email + \ " qe: " + self.qe_email + \ " qe_group: " + self.qe_group + \ "\n url: " + \ self.url() + \ "\n state: " + self.errata_state + \ "\n created: " + str(self.creation_date) + \ "\n ship target: " + str(pdate) + \ "\n batch_id: " + str(self.batch_id) + \ "\n ship date: " + str(self.ship_date) + \ "\n age: " + str(self.age) + " days" \ "\n bugs: " + str(self.errata_bugs) + \ s def __int__(self): return self.errata_id