Alerting to many different services using Python and sudo 1.9

Before version 1.9 was released, alterting in sudo was limited to e-mail messages. If you wanted to send alerts somewhere else, like Slack, you could only do this using external applications, like syslog-ng. Beginning with sudo 1.9, there is an Audit API that can be called from Python.

Previously, we provided you with a simple example that show how to print some debug information to the terminal. In this blog post we will extend that example with Apprise, a universal Python notification library. By integrating with Apprise, you can easily send alerts to dozens of different messaging services directly from sudo itself.

Note: the script below is provided for inspiration only, it is not production-ready code. The URL is hard coded in the Python code, alerts are sent for all sudo sessions and there is no error handling whatsoever.

Before you begin

To access the Audit API from Python you need to have sudo 1.9.1 or later installed with Python support enabled. Many enterprise Linux distributions (as well as some others) still only provide sudo 1.8. OpenSUSE Tumbleweed was the first to enable Python support in sudo and Fedora followed a few months later. On FreeBSD you must compile sudo from ports to enable Python. The sudo project also provides binaries for a large number of operating systems: https://www.sudo.ws/download.html#binary. For others, you need to compile sudo yourself.

You will also need Apprise. With a bit of luck, it may be available as a package on your operating system. For example, on Fedora:

dnf install apprise

For others you can install it using pip:

pip3 install apprise

A few things to consider

My first iteration of Apprise integration was really simple. In the __init__() section I initialized Apprise and I sent a notification for each and every audit message as it arrived by appending two lines to the script’s _log() method. While this worked, it was slow. I did my testing using Discord, and sending alerts there is limited to one message per second. Each session generates five or more messages, which meant waiting for five seconds before receiving a command prompt from sudo.

The next step was to aggregate log messages and send everything at the end. This works perfectly for short-lived commands, like ls or mount, but not so well when you reboot your host or start an interactive session using sudo -s.

So, while it is still possible to send each log message separately, the code by default sends logs in one or two batches. If the sudo session is approved, then the first batch is sent before the command is executed. Otherwise all logs are sent at the end.

Many notification services supported by Apprise have a length limit in place. While most of the log messages are relatively short, the environment might fill multiple screens (for example on RHEL/CentOS). I commented out the line showing the environment to keep the messages small.

The Python code

My code is based on the sudo Audit API sample Python code and only has a few modifications. Changes to the original code are marked with bold text.

You can copy and paste the code below to a file and save it on your system. As it only needs to be readable by root, for simplicity I have saved it in the /root/ directory. Make sure that the name is not used by another script already on your system in PYTHONPATH as that will have precedence and loading your code from sudo will fail.


import sudo

import os

import apprise

VERSION = 1.0


class SudoAuditPlugin(sudo.Plugin):
    def __init__(self, plugin_options, user_info, **kwargs):
        # For loading multiple times, an optional "Id" can be specified
        # as argument to identify the log lines
        self.apobj = apprise.Apprise()
        self.apobj.add('https://discord.com/api/webhooks/xxx/yyy')
        self.message = ''
        self.approved = False
        self.sync = sudo.options_as_dict(plugin_options).get("sync", "no")
        plugin_id = sudo.options_as_dict(plugin_options).get("Id", "")
        self._log_line_prefix = "(AUDIT{}) ".format(plugin_id)

        user_info_dict = sudo.options_as_dict(user_info)
        user = user_info_dict.get("user", "???")
        uid = user_info_dict.get("uid", "???")
        self._log("-- Started by user {} ({}) -- ".format(user, uid))

    def __del__(self):
        self._log("-- Finished --")

    def open(self, submit_optind: int, submit_argv: tuple) -> int:
        # To cut out the sudo options, use "submit_optind":
        program_args = submit_argv[submit_optind:]
        if program_args:
            self._log("Requested command: " + " ".join(program_args))

    def accept(self, plugin_name, plugin_type,
               command_info, run_argv, run_envp) -> int:
        info = sudo.options_as_dict(command_info)
        cmd = list(run_argv)
        cmd[0] = info.get("command")
        self._log("Accepted command: {}".format(" ".join(cmd)))
        self._log("  By the plugin: {} (type={})".format(
            plugin_name, self.__plugin_type_str(plugin_type)))
        if plugin_type == sudo.PLUGIN_TYPE.SUDO:
            self.approved = True
#        self._log("  Environment: " + " ".join(run_envp))


    def reject(self, plugin_name, plugin_type, audit_msg, command_info) -> int:
        self._log("Rejected by plugin {} (type={}): {}".format(
            plugin_name, self.__plugin_type_str(plugin_type), audit_msg))

    def error(self, plugin_name, plugin_type, audit_msg, command_info) -> int:
        self._log("Plugin {} (type={}) got an error: {}".format(
            plugin_name, self.__plugin_type_str(plugin_type), audit_msg))

    def close(self, status_kind: int, status: int) -> None:
        if status_kind == sudo.EXIT_REASON.NO_STATUS:
            self._log("The command was not executed")

        elif status_kind == sudo.EXIT_REASON.WAIT_STATUS:
            if os.WIFEXITED(status):
                self._log("Command returned with exit code "
                          "{}".format(os.WEXITSTATUS(status)))
            elif os.WIFSIGNALED(status):
                self._log("Command exited due to signal "
                          "{}".format(os.WTERMSIG(status)))
            else:
                raise sudo.PluginError("Failed to understand wait exit status")

        elif status_kind == sudo.EXIT_REASON.EXEC_ERROR:
            self._log("Sudo has failed to execute the command, "
                      "execve returned {}".format(status))

        elif status_kind == sudo.EXIT_REASON.SUDO_ERROR:
            self._log("Sudo has run into an error: {}".format(status))

        else:
            raise Exception("Command returned unknown status kind {}".format(
                status_kind))

    def show_version(self, is_verbose: bool) -> int:
        version_str = " (version=1.0)" if is_verbose else ""
        sudo.log_info("Python Example Audit Plugin" + version_str)

    def _log(self, string):
        # For the example, we just log to output (this could be a file)
        sudo.log_info(self._log_line_prefix, string)
        self.message = self.message + string + '\n'
        if self.sync == "yes":
            self.apobj.notify(body=self.message,title='sudo',)
            self.message=''
        else:
            if self.approved:
                self.apobj.notify(body=self.message,title='sudo',)
                self.message=''
                self.approved = False
            if string == '-- Finished --':
                self.apobj.notify(body=self.message,title='sudo',)

    @staticmethod
    def __plugin_type_str(plugin_type):
        return sudo.PLUGIN_TYPE(plugin_type).name

There is a new import at the beginning to use the Apprise library. In the __init__() method we do a couple of initializations–make sure that the URL is valid according to the Apprise documentation. In the accept() method we set a variable to True if the sudo session was accepted. There can be multiple accepts by different plugins; of those, the one from the sudo front-end is sent immediately before a command is executed.

The most complicated logic is present in the _log() method. I left the original logging code there–you can safely comment it out. If we set “sync” to “yes” in sudo.conf (see below) then each log message will be sent separately. Otherwise, log messages are aggregated. They are sent once self.approved is set to True or when the last line containing a fixed text arrives.

Configuring sudo

You can enable the script above by adding a few lines to /etc/sudo.conf:

Plugin python_audit python_plugin.so \
    ModulePath=/root/example_audit_plugin.py \
    ClassName=SudoAuditPlugin Id=666 sync=no

Change the file name as necessary for your own environment. You can send logs immediately instead of in batches if you change the value of “sync” to “yes”.

Testing

Testing is easy. First, you need to setup a notification service. In my case I used Discord. If you want to be extra sure, you can test Apprise from the command line. In case of Discord, my test command was something like this (the WebHook URL is anonymized):

[root@localhost ~]# apprise -vv -t 'sudo' -b 'czanik became root' 'https://discord.com/api/webhooks/xxx/yyy'
2021-04-14 15:43:23,875 - INFO - Notifying 1 service(s) asynchronously.
2021-04-14 15:43:24,093 - INFO - Sent Discord notification.
[root@localhost ~]# 

Testing from sudo is even easier. Just run sudo and check Discord (or whatever notification service you configured):

czanik@linux-pgsb:~> ls /root/bla
ls: cannot access '/root/bla': Permission denied
czanik@linux-pgsb:~> sudo ls /root/bla
(AUDIT666)  -- Started by user czanik (1000) -- 
(AUDIT666)  Requested command: ls /root/bla
(AUDIT666)  Accepted command: /usr/bin/ls /root/bla
(AUDIT666)    By the plugin: sudoers_policy (type=POLICY)
(AUDIT666)  Accepted command: /usr/bin/ls /root/bla
(AUDIT666)    By the plugin: python_approval (type=APPROVAL)
(AUDIT666)  Accepted command: /usr/bin/ls /root/bla
(AUDIT666)    By the plugin: sudo (type=SUDO)
ls: cannot access '/root/bla': No such file or directory
(AUDIT666)  Command returned with exit code 2
(AUDIT666)  -- Finished --
czanik@linux-pgsb:~> 

The output in Discord is pretty similar, though there are some differences. There is no prefix in front of the log lines, and the output of the sudo session is not shown:

sudo
-- Started by user czanik (1000) -- 
Requested command: ls /root/bla
Accepted command: /usr/bin/ls /root/bla
  By the plugin: sudoers_policy (type=POLICY)
Accepted command: /usr/bin/ls /root/bla
  By the plugin: python_approval (type=APPROVAL)
Accepted command: /usr/bin/ls /root/bla
  By the plugin: sudo (type=SUDO)
Command returned with exit code 2
sudo
-- Finished –

Next steps

As mentioned in the introduction, this is just sample code to get you started. On the other hand, hopefully it whets your appetite and now you want more. Here are a couple of ideas of where to go next:

  • make the URL configurable from sudo.conf
  • add at least minimal error handling (syslog message if sending message failed)
  • add conditions to control when notification are sent

If you would like to be notified about new posts and sudo news, sign up for the sudo blog announcement mailing list.