lwzm
3/12/2015 - 11:38 AM

RouterOS API

RouterOS API

#!/usr/bin/env python3

import codecs
import hashlib
import threading
import socket
import shlex


class RosApi:
    def __init__(self, socket):
        self._socket = socket

    def read_bytes(self, n):
        b = b''
        while n > 0:
            tmp = self._socket.recv(n)
            n -= len(tmp)
            b += tmp
        return b

    def write_bytes(self, b):
        self._socket.send(b)

    def read_int(self):
        byteorder = "big"
        n, = self.read_bytes(1)
        if not n & 0b10000000:
            mask, more = 0b01111111, 0
        elif not n & 0b01000000:
            mask, more = 0b00111111, 1
        elif not n & 0b00100000:
            mask, more = 0b00111111, 2
        elif not n & 0b00010000:
            mask, more = 0b00011111, 3
        else:
            mask, more = 0b00001111, 4
        return int.from_bytes(
            (n & mask).to_bytes(1, byteorder) + self.read_bytes(more), byteorder)

    def write_int(self, n):
        byteorder = "big"
        if n < 0x80:
            b = n.to_bytes(1, byteorder)
        elif n < 0x4000:
            b = (n | 0x8000).to_bytes(2, byteorder)
        elif n < 0x200000:
            b = (n | 0xC00000).to_bytes(3, byteorder)
        elif n < 0x10000000:        
            b = (n | 0xE0000000).to_bytes(4, byteorder)
        else:                       
            b = b'\xF0' + n.to_bytes(4, byteorder)
        self.write_bytes(b)

    def read_word(self):
        return self.read_bytes(self.read_int()).decode()

    def write_word(self, s):
        bs = s.encode()
        self.write_int(len(bs))
        self.write_bytes(bs)

    def talk(self, *words):
        for w in words:
            self.write_word(w)
        self.write_bytes(b'\x00')

    def listen(self):
        words = []
        while True:
            w = self.read_word()
            if not w:
                break
            words.append(w)
        return words

    def login(self, name="admin", password=""):
        self.talk("/login")
        done, msg = self.listen()
        *_, challenge = msg.split("=")
        assert done == "!done", done
        md5 = hashlib.md5(b'\x00' + password.encode() + codecs.decode(challenge, "hex")).hexdigest()
        self.talk("/login", "=name={}".format(name), "=response=00{}".format(md5))
        reply = self.listen()
        assert reply == ["!done"], reply


def test():
    class Socket():
        def __init__(self, data):
            self._data = data
            self._idx = 0

        def send(self, b):
            print(codecs.encode(b, "hex"))

        def recv(self, n):
            #n = 1
            i = self._idx
            assert i < len(self._data)
            self._idx = i + n
            return self._data[i:i + n]

    api = RosApi(Socket(b'\x7F' b'\xBF\xFF' b'\xCC'))
    api.write_int(1)
    api.write_int(127), api.write_int(128)
    api.write_int(0x3FFF), api.write_int(0x4000)
    api.write_int(0x1FFFFF), api.write_int(0x200000)
    api.write_int(0x0FFFFFFF), api.write_int(0x10000000)

    api = RosApi(Socket(
        b'\x7F' b'\x80\x80'
        b'\xBF\xFF' b'\xC0\x40\x00'
        b'\xDF\xFF\xFF' b'\xE0\x20\x00\x00'
        b'\xEF\xFF\xFF\xFF' b'\xF0\x10\x00\x00\x00'
        ))
    assert api.read_int() == 127
    assert api.read_int() == 128
    assert api.read_int() == 0x3FFF
    assert api.read_int() == 0x4000
    assert api.read_int() == 0x1FFFFF
    assert api.read_int() == 0x200000
    assert api.read_int() == 0x0FFFFFFF
    assert api.read_int() == 0x10000000


def main():
    s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    s.settimeout(1)
    s.connect(("192.168.0.21", 8728))
    api = RosApi(s)
    api.login('q', '')

    run = True

    def show_reply():
        while run:
            try:
                command, *attributes = api.listen()
            except socket.timeout:
                continue
            print(command)
            for i in attributes:
                print("    " + i)
            print()

    threading.Thread(target=show_reply).start()


    try:
        words = []
        while True:
            tmp = input()
            if tmp:
                words.append(tmp)
            else:
                api.talk(*shlex.split(" ".join(words)))
                words.clear()
    finally:
        run = False


if __name__ == "__main__":
    main()