HEX
Server: nginx/1.22.1
System: Linux iZuf67d4hh2ssx30nkok6dZ 3.10.0-1160.119.1.el7.x86_64 #1 SMP Tue Jun 4 14:43:51 UTC 2024 x86_64
User: www (1000)
PHP: 7.4.33
Disabled: passthru,exec,system,putenv,chroot,chgrp,chown,shell_exec,popen,proc_open,pcntl_exec,ini_alter,ini_restore,dl,openlog,syslog,readlink,symlink,popepassthru,pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,imap_open,apache_setenv
Upload Files
File: //usr/lib64/python2.7/site-packages/subscription_manager/syspurposelib.py
# -*- coding: utf-8 -*-

from __future__ import print_function, division, absolute_import

#
# Copyright (c) 2018 Red Hat, Inc.
#
# This software is licensed to you under the GNU General Public License,
# version 2 (GPLv2). There is NO WARRANTY for this software, express or
# implied, including the implied warranties of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. You should have received a copy of GPLv2
# along with this software; if not, see
# http://www.gnu.org/licenses/old-licenses/gpl-2.0.txt.
#
# Red Hat trademarks are not licensed under GPLv2. No permission is
# granted to use or replicate Red Hat trademarks that are incorporated
# in this software or its documentation.
#

"""
This module is an interface to syspurpose's SyncedStore class from subscription-manager.
It contains methods for accessing/manipulating the local syspurpose.json metadata file through SyncedStore.
"""

from rhsm.connection import ConnectionException
from subscription_manager import certlib
from subscription_manager import injection as inj

import logging
import json
import os
log = logging.getLogger(__name__)

try:
    from syspurpose.sync import sync
except ImportError:
    def sync(uep, consumer_uuid, command=None, report=None):
        log.debug("Syspurpose module unavailable, not syncing")
        return read_syspurpose()

try:
    from syspurpose.files import SyncedStore, USER_SYSPURPOSE
except ImportError:
    log.debug("Could not import from module syspurpose.")
    SyncedStore = None
    USER_SYSPURPOSE = "/etc/rhsm/syspurpose/syspurpose.json"

store = None
syspurpose = None


def save_sla_to_syspurpose_metadata(service_level):
    """
    Saves the provided service-level value to the local Syspurpose Metadata (syspurpose.json) file.
    If the service level provided is null or empty, the sla value to the local syspurpose file is set to null.

    :param service_level: The service-level value to be saved in the syspurpose file.
    :type service_level: str
    """

    if 'SyncedStore' in globals() and SyncedStore is not None:
        store = SyncedStore(None)

        # if empty, set it to null
        if service_level is None or service_level == "":
            service_level = None

        store.set("service_level_agreement", service_level)
        store.finish()
        log.debug("Syspurpose SLA value successfully saved locally.")
    else:
        log.error("SyspurposeStore could not be imported. Syspurpose SLA value not saved locally.")


def get_sys_purpose_store():
    """
    :return: Returns a singleton instance of the syspurpose store if it was imported.
             Otherwise None.
    """
    global store
    if store is not None:
        return store
    elif SyncedStore is not None:
        uep = inj.require(inj.CP_PROVIDER).get_consumer_auth_cp()
        uuid = inj.require(inj.IDENTITY).uuid
        store = SyncedStore(uep, consumer_uuid=uuid)
    return store


def read_syspurpose(raise_on_error=False):
    """
    Reads the system purpose from the correct location on the file system.
    Makes an attempt to use a SyspurposeStore if available falls back to reading the json directly.
    :return: A dictionary containing the total syspurpose.
    """
    if SyncedStore is not None:
        try:
            syspurpose = SyncedStore(None).get_local_contents()
        except (OSError, IOError):
            syspurpose = {}
    else:
        try:
            syspurpose = json.load(open(USER_SYSPURPOSE))
        except (os.error, ValueError, IOError):
            # In the event this file could not be read treat it as empty
            if raise_on_error:
                raise
            syspurpose = {}
    return syspurpose


def write_syspurpose(values):
    """
    Write the syspurpose to the file system.
    :param values:
    :return:
    """
    if SyncedStore is not None:
        sp = SyncedStore(None)
        sp.update_local(values)
    else:
        # Simple backup in case the syspurpose tooling is not installed.
        try:
            json.dump(values, open(USER_SYSPURPOSE), ensure_ascii=True, indent=2)
        except OSError:
            log.warning('Could not write syspurpose to %s' % USER_SYSPURPOSE)
            return False
    return True


class SyspurposeSyncActionInvoker(certlib.BaseActionInvoker):
    """
    Used by rhsmcertd to sync the syspurpose values locally with those from the Server.
    """

    def _do_update(self):
        action = SyspurposeSyncActionCommand()
        return action.perform()


class SyspurposeSyncActionReport(certlib.ActionReport):
    name = "Syspurpose Sync"

    def record_change(self, change):
        """
        Records the change detected by the three_way_merge function into a record in the report.
        :param change: A util.DiffChange object containing the recorded changes.
        :return: None
        """
        if change.source == 'remote':
            source = 'Entitlement Server'
        elif change.source == 'local':
            source = USER_SYSPURPOSE
        else:
            source = 'cached system purpose values'
        msg = None
        if change.in_base and not change.in_result:
            msg = "'{key}' removed by change from {source}".format(key=change.key,
                                                                     source=source)
        elif not change.in_base and change.in_result:
            msg = "'{key}' added with value '{value}' from change in {source}".format(
                    key=change.key, value=change.new_value, source=source
            )
        elif change.in_base and change.previous_value != change.new_value:
            msg = "'{key}' updated from '{old_value}' to '{new_value}' due to change in {source}"\
                .format(key=change.key, new_value=change.new_value,
                        old_value=change.previous_value, source=source)

        if msg:
            self._updates.append(msg)

    """
    The base method formatting does not fit the massages we are seeing here
    BZ #1789457
    """
    def format_exceptions(self):
        buf = ''
        for e in self._exceptions:
            buf += str(e).strip()
            buf += '\n'
        return buf


class SyspurposeSyncActionCommand(object):
    """
    Sync the system purpose values, by performing a three-way merge between:
      - The last known shared state (SyspurposeCache)
      - The current values on the server
      - The current values on the file system
    """

    def __init__(self):
        self.report = SyspurposeSyncActionReport()
        self.cp_provider = inj.require(inj.CP_PROVIDER)
        self.uep = self.cp_provider.get_consumer_auth_cp()

    def perform(self, include_result=False):
        """
        Perform the action that this Command represents.
        :return:
        """
        result = {}
        consumer_uuid = inj.require(inj.IDENTITY).uuid

        try:
            store = SyncedStore(uep=self.uep,
                                        consumer_uuid=consumer_uuid,
                                        report=self.report,
                                        on_changed=self.report.record_change)
            result = store.sync()
        except ConnectionException as e:
            self.report._exceptions.append('Unable to sync syspurpose with server: %s' % str(e))
            self.report._status = 'Failed to sync system purpose'
        self.report._updates = "\n\t\t ".join(self.report._updates)
        log.debug("Syspurpose updated: %s" % self.report)
        if not include_result:
            return self.report
        else:
            return self.report, result