2 つのシナリオがあり、どちらもSNMP.py
この回答で参照されています。
pysnmp (v4.2.3) および pysnmp-mibs (v0.1.4) :
>>> # pysnmp-mibs 0.1.4 and pysnmp 4.2.3
>>> from SNMP import v2c
>>> snmp = v2c('172.16.1.1', 'public')
>>> snmp.walk('ifName')
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "SNMP.py", line 115, in walk
(('', oid),),
File "/usr/local/lib/python2.6/dist-packages/pysnmp-4.2.3-py2.6.egg/pysnmp/entity/rfc3413/oneliner/cmdgen.py", line 449, in nextCmd
File "/usr/local/lib/python2.6/dist-packages/pysnmp-4.2.3-py2.6.egg/pysnmp/entity/rfc3413/oneliner/cmdgen.py", line 150, in makeReadVarBinds
File "/usr/local/lib/python2.6/dist-packages/pysnmp-4.2.3-py2.6.egg/pysnmp/entity/rfc3413/oneliner/mibvar.py", line 161, in resolveWithMib
File "/usr/local/lib/python2.6/dist-packages/pysnmp-4.2.3-py2.6.egg/pysnmp/smi/builder.py", line 295, in importSymbols
pysnmp.smi.error.SmiError: importSymbols: empty MIB module name
>>>
pysnmp (v4.2.2) および pysnmp-mibs (v0.1.3) :
>>> # After removing pysnmp / pysnmp-mibs / pyasn.1 and installing with:
>>> # easy_install pysnmp==4.2.2 pysnmp-mibs==0.1.3
>>> snmp = v2c('172.16.1.1', 'public')
>>> snmp.walk('ifName')
[SNMPObject(modName='IF-MIB', symName='ifName', index=1, value='Null0'), SNMPObject(modName='IF-MIB', symName='ifName', index=2, value='Internal-Data0/0'), SNMPObject(modName='IF-MIB', symName='ifName', index=3, value='Ethernet0/0'), SNMPObject(modName='IF-MIB', symName='ifName', index=4, value='Ethernet0/1'), SNMPObject(modName='IF-MIB', symName='ifName', index=5, value='Ethernet0/2'), SNMPObject(modName='IF-MIB', symName='ifName', index=11, value='Internal-Data0/1'), SNMPObject(modName='IF-MIB', symName='ifName', index=12, value='_internal_loopback'), SNMPObject(modName='IF-MIB', symName='ifName', index=13, value='Virtual254'), SNMPObject(modName='IF-MIB', symName='ifName', index=14, value='Vlan1'), SNMPObject(modName='IF-MIB', symName='ifName', index=15, value='OUTSIDE'), SNMPObject(modName='IF-MIB', symName='ifName', index=16, value='INSIDE')]
>>>
質問
pysnmp v4.2.3 を使用すると何が問題になりますか? 問題はSNMP.py
pysnmp ライブラリ内またはどこかにありますか?
更新 1
cmdgen.MibVariable(oid).loadMibs(),
Ilyaの提案どおりに使用すると、まだエラーが発生します...
pysnmp.smi.error.NoSuchObjectError: NoSuchObjectError({'str': "Can't resolve node name ::('ifName',) at <pysnmp.smi.view.MibViewController instance at 0x2af9e60>"})
このエラーを解決するにはどうすればよいですか? 次のSNMP.py
コードを使用しています。
from collections import namedtuple as NT
from datetime import datetime
import string
import re
from pysnmp.entity.rfc3413.oneliner import cmdgen
from pysnmp.smi import builder, view, error
from numpy import int64, float64
# NOTE!!!
# It is best to install the pysnmp-mibs package from pypi... this makes
# a lot of symbolic MIB names "just work"
# See this link below for many oneliner examples...
# http://pysnmp.sourceforge.net/examples/4.x/v3arch/oneliner/index.html
class v2c(object):
"""Build an SNMPv2c manager object"""
def __init__(self, ipaddr=None, device=None, community='Public',
retries=3, timeout=9):
self.device = device
self.ipaddr = ipaddr
self.community = community
self.SNMPObject = NT('SNMPObject', ['modName', 'datetime', 'symName',
'index', 'value'])
self.SNMPIndexed = NT('SNMPIndexed', ['modName', 'datetime', 'symName',
'index', 'value'])
self.query_timeout = float(timeout)/int(retries)
self.query_retries = int(retries)
self._index = None
self.cmdGen = cmdgen.CommandGenerator()
def index(self, oid=None):
"""Build an SNMP Manager index to reference in get or walk operations. First v2c.index('ifName'). Then, v2c.get_index('ifHCInOctets', 'eth0') or v2c.walk_index('ifHCInOctets'). Instead of referencing a numerical index, the index will refer to the value that was indexed."""
self._index = dict()
self._intfobj = dict()
snmpidx = self.walk(oid=oid)
for ii in snmpidx:
## the dicts below are keyed by the SNMP index number
# value below is the text string of the intf name
self._index[ii.index] = ii.value
# value below is the intf object
if not (self.device is None):
self._intfobj[ii.index] = self.device.find_match_intf(ii.value,
enforce_format=False)
def walk_index(self, oid=None):
"""Example usage, first index with v2c.index('ifName'), then v2c.get_index('ifHCInOctets', 'eth0')"""
if not (self._index is None):
tmp = list()
snmpvals = self.walk(oid=oid)
for idx, ii in enumerate(snmpvals):
tmp.append([ii.modName, datetime.now(), ii.symName,
self._index[ii.index], ii.value])
return map(self.SNMPIndexed._make, tmp)
else:
raise ValueError, "Must populate with SNMP.v2c.index() first"
def walk(self, oid=None):
if isinstance(self._format(oid), tuple):
errorIndication, errorStatus, errorIndex, \
varBindTable = self.cmdGen.nextCmd(
cmdgen.CommunityData('test-agent', self.community),
cmdgen.UdpTransportTarget((self.ipaddr, 161),
retries=self.query_retries,
timeout=self.query_timeout),
self._format(oid),
)
# Parsing only for now... no return value...
self._parse(errorIndication, errorStatus, errorIndex, varBindTable)
elif isinstance(oid, str):
errorIndication, errorStatus, errorIndex, \
varBindTable = self.cmdGen.nextCmd(
# SNMP v2
cmdgen.CommunityData('test-agent', self.community),
# Transport
cmdgen.UdpTransportTarget((self.ipaddr, 161)),
cmdgen.MibVariable(oid).loadMibs(),
)
return self._parse_resolve(errorIndication, errorStatus,
errorIndex, varBindTable)
else:
raise ValueError, "Unknown oid format: %s" % oid
def get_index(self, oid=None, index=None):
"""In this case, index should be similar to the values you indexed from... i.e. if you index with ifName, get_index('ifHCInOctets', 'eth0')"""
if not (self._index is None) and isinstance(index, str):
# Map the interface name provided in index to an ifName index...
snmpvals = None
for idx, value in self._index.items():
if index == value:
# if there is an exact match between the text index and the
# snmp index value...
snmpvals = self.get(oid=oid, index=idx)
break
else:
# TRY mapping the provided text index into an interface obj
_intfobj = self.device.find_match_intf(index)
if not (_intfobj is None):
for key, val in self._intfobj.items():
if (val==_intfobj):
snmpvals = self.get(oid=oid, index=key)
break
# Ensure we only parse a valid response...
if not (snmpvals is None):
tmp = [snmpvals.modName, datetime.now(), snmpvals.symName,
self._index[snmpvals.index], snmpvals.value]
return self.SNMPIndexed._make(tmp)
elif not isinstance(index, str):
raise ValueError, "index must be a string value"
else:
raise ValueError, "Must populate with SNMP.v2c.index() first"
def get(self, oid=None, index=None):
if isinstance(self._format(oid), tuple):
errorIndication, errorStatus, errorIndex, \
varBindTable = self.cmdGen.getCmd(
cmdgen.CommunityData('test-agent', self.community),
cmdgen.UdpTransportTarget((self.ipaddr, 161),
retries=self.query_retries,
timeout=self.query_timeout),
self._format(oid),
)
# Parsing only for now... no return value...
self._parse(errorIndication, errorStatus, errorIndex, varBindTable)
elif isinstance(oid, str) and isinstance(index, int):
errorIndication, errorStatus, errorIndex, \
varBindTable = self.cmdGen.getCmd(
# SNMP v2
cmdgen.CommunityData('test-agent', self.community),
# Transport
cmdgen.UdpTransportTarget((self.ipaddr, 161)),
cmdgen.MibVariable(oid).loadMibs(),
)
return self._parse_resolve(errorIndication, errorStatus,
errorIndex, [varBindTable])[0]
else:
raise ValueError, "Unknown oid format: %s" % oid
def bulkwalk(self, oid=None):
"""SNMP bulkwalk a device. NOTE: This often is faster, but does not work as well as a simple SNMP walk"""
if isinstance(self._format(oid), tuple):
errorIndication, errorStatus, errorIndex, varBindTable = self.cmdGen.bulkCmd(
cmdgen.CommunityData('test-agent', self.community),
cmdgen.UdpTransportTarget((self.ipaddr, 161),
retries=self.query_retries,
timeout=self.query_timeout),
0,
25,
self._format(oid),
)
return self._parse(errorIndication, errorStatus,
errorIndex, varBindTable)
elif isinstance(oid, str):
errorIndication, errorStatus, errorIndex, varBindTable = self.cmdGen.bulkCmd(
cmdgen.CommunityData('test-agent', self.community),
cmdgen.UdpTransportTarget((self.ipaddr, 161),
retries=self.query_retries,
timeout=self.query_timeout),
0,
25,
cmdgen.MibVariable(oid).loadMibs(),
)
return self._parse_resolve(errorIndication, errorStatus,
errorIndex, varBindTable)
else:
raise ValueError, "Unknown oid format: %s" % oid
def _parse_resolve(self, errorIndication=None, errorStatus=None,
errorIndex=None, varBindTable=None):
"""Parse MIB walks and resolve into MIB names"""
retval = list()
if errorIndication:
print errorIndication
else:
if errorStatus:
print '%s at %s\n' % (
errorStatus.prettyPrint(),
varBindTable[-1][int(errorIndex)-1]
)
else:
for varBindTableRow in varBindTable:
for oid, val in varBindTableRow:
(symName, modName), indices = cmdgen.mibvar.oidToMibName(
self.cmdGen.mibViewController, oid
)
val = cmdgen.mibvar.cloneFromMibValue(
self.cmdGen.mibViewController, modName, symName,
val)
# Try to parse the index as an int first,
# then as a string
try:
index = int(string.join(map(lambda v: v.prettyPrint(), indices), '.'))
except ValueError:
index = str(string.join(map(lambda v: v.prettyPrint(), indices), '.'))
# Re-format values as float or integer, if possible...
tmp = val.prettyPrint()
if re.search(r"""^\s*\d+\s*$""", tmp):
value = int64(tmp)
elif re.search(r"""^\s*\d+\.\d+\s*$""", tmp):
value = float64(tmp)
else:
value = tmp
retval.append(self.SNMPObject._make([modName,
datetime.now(), symName, index, value]))
return retval
def _parse(self, errorIndication, errorStatus, errorIndex,
varBindTable):
if errorIndication:
print errorIndication
else:
if errorStatus:
print '%s at %s\n' % (
errorStatus.prettyPrint(),
errorIndex and varBindTable[-1][int(errorIndex)-1] or '?'
)
else:
for varBindTableRow in varBindTable:
for name, val in varBindTableRow:
print '%s = %s' % (name.prettyPrint(), val.prettyPrint())
def _format(self, oid):
"""Format a numerical OID in the form of 1.3.4.1.2.1 into a tuple"""
if isinstance(oid, str):
if re.search('(\d+\.)+\d+', oid):
tmp = list()
for ii in oid.split('.'):
tmp.append(int(ii))
return tuple(tmp)
else:
return oid