jetz
8/26/2014 - 3:02 AM

Some python clients for radius, eg. freeradius

Some python clients for radius, eg. freeradius

#!/usr/bin/env python
'''
Extremly basic RADIUS authentication. Bare minimum required to authenticate
a user, yet remain RFC2138 compliant (I hope).

Homepage at http://github.com/btimby/py-radius/
'''
# Copyright (c) 1999, Stuart Bishop <zen@shangri-la.dropbear.id.au>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
#     Redistributions of source code must retain the above copyright
#     notice, this list of conditions and the following disclaimer.
#
#     Redistributions in binary form must reproduce the above copyright
#     notice, this list of conditions and the following disclaimer in the
#     documentation and/or other materials provided with the
#     distribution.
#
#     The name of Stuart Bishop may not be used to endorse or promote
#     products derived from this software without specific prior written
#     permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE REGENTS OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from struct import pack
from select import select
from random import randint
try:
    from hashlib import md5
except ImportError:
    from md5 import new as md5
import socket

__version__ = '1.0.3'

# Constants
ACCESS_REQUEST = 1
ACCESS_ACCEPT = 2
ACCESS_REJECT = 3

DEFAULT_RETRIES = 3
DEFAULT_TIMEOUT = 5


class Error(Exception):
    pass


class NoResponse(Error):
    pass


class SocketError(NoResponse):
    pass


def authenticate(username, password, secret, host='radius', port=1812):
    '''Return 1 for a successful authentication. Other values indicate
       failure (should only ever be 0 anyway).

       Can raise either NoResponse or SocketError'''

    r = RADIUS(secret, host, port)
    return r.authenticate(username, password)


class RADIUS(object):

    def __init__(self, secret, host='radius', port=1812):
        self._secret = secret
        self._host = host
        self._port = port

        self.retries = DEFAULT_RETRIES
        self.timeout = DEFAULT_TIMEOUT
        self._socket = None

    def __del__(self):
        self.closesocket()

    def opensocket(self):
        if self._socket is None:
            self._socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
            self._socket.connect((self._host, self._port))

    def closesocket(self):
        if self._socket is not None:
            try:
                self._socket.close()
            except socket.error, x:
                raise SocketError(x)
            self._socket = None

    def generateAuthenticator(self):
        '''A 16 byte random string'''
        v = range(0, 17)
        v[0] = '16B'
        for i in range(1, 17):
            v[i] = randint(1, 255)

        return apply(pack, v)

    def radcrypt(self, authenticator, text):
        '''Encrypt a password with the secret'''
        # First, pad the password to multiple of 16 octets.
        text += chr(0) * (16 - (len(text) % 16))
        if len(text) > 128:
            raise Exception('Password exceeds maximun of 128 bytes')
        result = ''
        last = authenticator
        while text:
            # md5sum the shared secret with the authenticator,
            # after the first iteration, the authenticator is the previous
            # result of our encryption.
            hash = md5(self._secret + last).digest()
            for i in range(16):
                result += chr(ord(hash[i]) ^ ord(text[i]))
            # The next iteration will act upon the next 16 octets of the password
            # and the result of our xor operation above. We will set last to
            # the last 16 octets of our result (the xor we just completed). And
            # remove the first 16 octets from the password.
            last, text = result[-16:], text[16:]
        return result

    def authenticate(self, uname, passwd):
        '''Attempt t authenticate with the given username and password.
           Returns 0 on failure
           Returns 1 on success
           Raises a NoResponse (or its subclass SocketError) exception if
                no responses or no valid responses are received'''

        try:
            self.opensocket()
            id = randint(0, 255)

            authenticator = self.generateAuthenticator()

            encpass = self.radcrypt(authenticator, passwd)

            msg = pack('!B B H 16s B B %ds B B %ds'
                       % (len(uname), len(encpass)),
                       1, id,
                       # Length of entire message
                       len(uname) + len(encpass) + 24,
                       authenticator,
                       1, len(uname) + 2, uname,
                       2, len(encpass) + 2, encpass)

            for i in range(0, self.retries):
                self._socket.send(msg)

                t = select([self._socket, ], [], [], self.timeout)
                if len(t[0]) > 0:
                    response = self._socket.recv(4096)
                else:
                    continue

                if ord(response[1]) != id:
                    continue

                # Verify the packet is not a cheap forgery or corrupt
                checkauth = response[4:20]
                m = md5(response[0:4] + authenticator + response[20:]
                        + self._secret).digest()

                if m != checkauth:
                    continue

                if ord(response[0]) == ACCESS_ACCEPT:
                    return 1
                else:
                    return 0

        except socket.error, x:  # SocketError
            try:
                self.closesocket()
            except:
                pass
            raise SocketError(x)

        raise NoResponse

# Don't break code written for radius.py distributed with the ZRadius
# Zope product
Radius = RADIUS

if __name__ == '__main__':

    secret = 'testing123'
    host = '127.0.0.1'
    port = 1812

    r = Radius(secret, host, port)

    uname = 'username'
    passwd = 'password'
    if r.authenticate(uname, passwd):
        print "Authentication Succeeded"
    else:
        print "Authentication Failed"
#!/usr/bin/env python
# encoding: utf-8

from StringIO import StringIO

from pyrad.client import Client
from pyrad.dictionary import Dictionary
from pyrad.packet import (AccessRequest,
                          AccessAccept,
                          AccessReject)

DICTIONARY = u"""
ATTRIBUTE   User-Name                1   string
ATTRIBUTE   User-Password            2   string
ATTRIBUTE   CHAP-Password            3   octets
ATTRIBUTE   NAS-IP-Address           4   ipaddr
ATTRIBUTE   NAS-Port                 5   integer
ATTRIBUTE   Service-Type             6   integer
ATTRIBUTE   Framed-Protocol          7   integer
ATTRIBUTE   Framed-IP-Address        8   ipaddr
ATTRIBUTE   Framed-IP-Netmask        9   ipaddr
ATTRIBUTE   Framed-Routing           10  integer
ATTRIBUTE   Filter-Id                11  string
ATTRIBUTE   Framed-MTU               12  integer
ATTRIBUTE   Framed-Compression       13  integer
ATTRIBUTE   Login-IP-Host            14  ipaddr
ATTRIBUTE   Login-Service            15  integer
ATTRIBUTE   Login-TCP-Port           16  integer
ATTRIBUTE   Reply-Message            18  string
ATTRIBUTE   Callback-Number          19  string
ATTRIBUTE   Callback-Id              20  string
ATTRIBUTE   Framed-Route             22  string
ATTRIBUTE   Framed-IPX-Network       23  ipaddr
ATTRIBUTE   State                    24  octets
ATTRIBUTE   Class                    25  octets
ATTRIBUTE   Vendor-Specific          26  octets
ATTRIBUTE   Session-Timeout          27  integer
ATTRIBUTE   Idle-Timeout             28  integer
ATTRIBUTE   Termination-Action       29  integer
ATTRIBUTE   Called-Station-Id        30  string
ATTRIBUTE   Calling-Station-Id       31  string
ATTRIBUTE   NAS-Identifier           32  string
ATTRIBUTE   Proxy-State              33  octets
ATTRIBUTE   Login-LAT-Service        34  string
ATTRIBUTE   Login-LAT-Node           35  string
ATTRIBUTE   Login-LAT-Group          36  octets
ATTRIBUTE   Framed-AppleTalk-Link    37  integer
ATTRIBUTE   Framed-AppleTalk-Network 38 integer
ATTRIBUTE   Framed-AppleTalk-Zone    39  string
"""

if __name__ == '__main__':
    secret = 'testing123'
    host = '127.0.0.1'
    port = 1812

    cli = Client(server=host, secret=secret,
                 dict=Dictionary(StringIO(DICTIONARY)))

    req = cli.CreateAuthPacket(code=AccessRequest,
                               User_Name="username",
                               NAS_Identifier="localhost")
    req["User-Password"] = req.PwCrypt("password")

    reply = cli.SendPacket(req)

    if reply.code == AccessAccept:
        print "access accepted"
    elif reply.code == AccessReject:
        print "access denied"
#!/usr/bin/env python
# encoding: utf-8

###########################################################
# Author: Simon Engledew
# Source: https://github.com/simon-engledew/python-radius
###########################################################

import os
import hmac
import socket
import select
import struct
import hashlib
import itertools
from contextlib import closing, contextmanager


def join(chunks):
    return ''.join(chunks)


def lift(partial):
    def decorator(fn):
        def decorated(*args, **kwargs):
            return partial(fn(*args, **kwargs))
        return decorated
    return decorator


class Pair(object):
    Head = '!B B'
    HeadSize = struct.calcsize(Head)

    def __init__(self, code, value):
        self.code = code
        self.value = value

    def __str__(self):
        return '{0}={1}'.format(self.code, self.value)

    @classmethod
    def unpack(cls, data):
        n = 0
        while n < len(data):
            code, length = struct.unpack(Pair.Head, data[n:n + Pair.HeadSize])
            yield Pair(code, data[n + Pair.HeadSize:n + length])
            n += length

    def pack(self):
        return struct.pack(Pair.Head, self.code,
                           len(self.value) + Pair.HeadSize) + self.value


class Packet(object):
    Head, Tail = '!B B H 16s', '!B B 16s'
    HeadSize, TailSize = struct.calcsize(Head), struct.calcsize(Tail)

    def __init__(self, code, id, authenticator, *pairs):
        self.code = code
        self.id = id
        self.authenticator = authenticator
        self.pairs = pairs

    def __str__(self):
        return 'Packet({0}, id={1})[{2}]'.format(self.code, self.id,
                                                 ', '.join(self.pairs))

    def __len__(self):
        return

    @classmethod
    def unpack(cls, secret, data):
        code, id, length, authenticator = struct.unpack(
            Packet.Head, data[:Packet.HeadSize])
        return Packet(code, *Pair.unpack(data[Packet.HeadSize:]),
                      **{'id': id, 'authenticator': authenticator})

    def pack(self, secret):
        pairs = join(pair.pack() for pair in self.pairs)
        output = (struct.pack(Packet.Head,
                              self.code,
                              self.id,
                              Packet.HeadSize + Packet.TailSize + len(pairs),
                              self.authenticator) + pairs)
        digest = hmac.new(secret,
                          output + struct.pack(Packet.Tail,
                                               Radius.MESSAGE_AUTHENTICATOR,
                                               Packet.TailSize,
                                               '')).digest()
        return output + struct.pack(Packet.Tail,
                                    Radius.MESSAGE_AUTHENTICATOR,
                                    Packet.TailSize,
                                    digest)


class Radius(object):
    ACCESS_REQUEST = 1
    ACCESS_ACCEPT = 2
    ACCESS_REJECT = 3
    STATUS_SERVER = 12
    MESSAGE_AUTHENTICATOR = 80

    def __init__(self, connection, secret):
        self.connection = connection
        self.secret = secret

    @staticmethod
    @lift(join)
    def digest(secret, authenticator, password):
        assert len(password) <= 128, 'Password exceeds maximum length'
        previous = authenticator
        for n in xrange(0, len(password), 16):
            digest = hashlib.md5(secret + previous).digest()
            previous = join(chr(ord(a) ^ ord(b)) for a, b in
                            itertools.izip_longest(digest, password[n:n + 16],
                                                   fillvalue='\0'))
            yield previous

    @staticmethod
    def authenticator():
        return os.urandom(16)

    @classmethod
    @contextmanager
    def connect(cls, target, secret, timeout=3, retries=3):
        with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as con:
            con.connect(target)

            yield cls(con, secret)

    def ping(self):
        try:
            reply = self(Packet(Radius.STATUS_SERVER,
                                ord(os.urandom(1)),
                                Radius.authenticator())).code
            return reply == Radius.ACCESS_ACCEPT
        except IOError as e:
            print e
            return False

    def authenticate(self, username, password):

        authenticator = Radius.authenticator()

        try:
            p_username = Pair(1, username)
            p_password = Pair(2, Radius.digest(self.secret,
                                               authenticator,
                                               password))
            reply = self(Packet(Radius.ACCESS_REQUEST,
                                ord(os.urandom(1)),
                                authenticator,
                                p_username,
                                p_password)).code

            return reply == Radius.ACCESS_ACCEPT
        except IOError as e:
            print e
            return False
        except Exception as e:
            print e
            return False

    def __call__(self, outbound, timeout=3):
        self.connection.sendall(outbound.pack(self.secret))

        r, w, x = select.select([self.connection], [], [], timeout)

        if self.connection not in r:
            raise IOError('No response from host')
        response = self.connection.recv(4096)
        inbound = Packet.unpack(self.secret, response)
        if inbound.id != outbound.id:
            raise ValueError('Invalid packet id')

        if response[4:20] != (hashlib.md5(response[0:4] +
                                          outbound.authenticator +
                                          response[20:] +
                                          self.secret).digest()):
            raise ValueError('Illegal authenticator')
        return inbound

connect = Radius.connect

if __name__ == '__main__':

    secret = 'testing123'
    host = '127.0.0.1'
    port = 1812

    with connect((host, 1812), secret) as connection:
        print connection.ping()
        print connection.authenticate('username', 'password')