Commit 98e760ca authored by Elena Grandi's avatar Elena Grandi
Browse files

Validate mac/ip/hostnames received by the stores

parent 6960cef0
......@@ -3,6 +3,7 @@ import logging
import re
from ..config import Config
from .. import utils
"""
Data sources for the fuss-manager.
......@@ -51,12 +52,6 @@ class DataSource:
class MachineDataSourceMixin:
"""
"""
re_validate_mac = re.compile(r'^(?:[0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$')
re_validate_hostname = re.compile(
# Each component cannot start or end with a dash
r"^(?:(?:[a-z0-9]|[a-z0-9][a-z0-9-]*[a-z0-9])\.)*"
# Hostname cannot end with a dot
r"(?:[a-z0-9]|[a-z0-9][a-z0-9-]*[a-z0-9])$", flags=re.I)
def __init__(self, *args, **kw):
super().__init__(*args, **kw)
......@@ -80,24 +75,19 @@ class MachineDataSourceMixin:
"""
Check that a string looks like a macaddress
"""
return bool(self.re_validate_mac.match(s))
raise DeprecationWarning("Use utils.check_mac instead.")
return utils.check_mac(s)
def check_ip(self, s):
"""
Check that a string looks like an ipv4 address
"""
try:
ipaddress.IPv4Address(s)
except ipaddress.AddressValueError:
return False
return True
raise DeprecationWarning("Use utils.check_ip instead.")
return utils.check_ip(s)
def check_hostname(self, s):
"""
Check that a string looks like an hostname.
"""
if not s:
return False
if not self.re_validate_hostname.match(s):
return False
return not self.check_ip(s)
raise DeprecationWarning("Use utils.check_hostname instead.")
return utils.check_hostname(s)
......@@ -10,7 +10,7 @@ import time
import ruamel.yaml
import aiodns
from . import events
from . import events, utils
"""
Data store for the fuss-manager.
......@@ -349,7 +349,9 @@ class MachineStore:
out = await proc.communicate()
for line in out[0].decode().split('\n'):
if line and mac in line:
return line.split()[0]
ip = line.split()[0]
if utils.check_ip(ip):
return line.split()[0]
return None
async def ip_to_mac(self, ip):
......@@ -362,7 +364,9 @@ class MachineStore:
)
out = await proc.communicate()
if ip in out[0].decode().split('\n')[1]:
return out[0].decode().split('\n')[1][33:50]
mac = out[0].decode().split('\n')[1][33:50]
if utils.check_mac(mac):
return mac
return None
async def ip_to_name(self, ip):
......@@ -373,6 +377,8 @@ class MachineStore:
name = socket.gethostbyaddr(ip)[0]
except socket.herror:
name = None
if not utils.check_hostname(name):
name = None
# TODO: aiodns 1.2.0 adds a gethostbyaddr which can be used in
# the following way
# res = await self.resolver.gethostbyaddr(ip)
......@@ -384,7 +390,10 @@ class MachineStore:
Find the ip of a machine with a known fqdn.
"""
res = await self.resolver.gethostbyname(name, socket.AF_INET)
return res.addresses[0]
ip = res.addresses[0]
if utils.check_ip:
return ip
return None
async def is_registered(self, name):
"""
......
import ipaddress
import re
"""
Common data validation functions
"""
re_validate_mac = re.compile(r'^(?:[0-9A-Fa-f]{2}:){5}[0-9A-Fa-f]{2}$')
re_validate_hostname = re.compile(
# Each component cannot start or end with a dash
r"^(?:(?:[a-z0-9]|[a-z0-9][a-z0-9-]*[a-z0-9])\.)*"
# Hostname cannot end with a dot
r"(?:[a-z0-9]|[a-z0-9][a-z0-9-]*[a-z0-9])$", flags=re.I
)
def check_mac(s):
"""
Check that a string looks like a macaddress
"""
return bool(re_validate_mac.match(s))
def check_ip(s):
"""
Check that a string looks like an ipv4 address
"""
try:
ipaddress.IPv4Address(s)
except ipaddress.AddressValueError:
return False
return True
def check_hostname(s):
"""
Check that a string looks like an hostname.
"""
if not s:
return False
if not re_validate_hostname.match(s):
return False
return not check_ip(s)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment