goelakash
goelakash

Reputation: 2519

Getting same process details as task manager in Windows

I have written a program to get memory details of the current running processes using psutil in Python 3. The problem is that the values I get are not the same as those from the Windows Task Manager. Specifically, how do I obtain the private working set size of a process in Python?

Upvotes: 2

Views: 3288

Answers (1)

Eryk Sun
Eryk Sun

Reputation: 34270

psutil calls GetProcessMemoryInfo, which doesn't break the working set down by private vs shared memory. To get this information you can use the Windows performance counter API. Another way, which I demonstrate below, is to directly count the number of shared pages. QueryWorkingSet returns an array of PSAPI_WORKING_SET_BLOCK entries (one per page in the working set), for which you can tally the entries that have the Shared field set. You'll need a process handle, which you can get by calling either GetCurrentProcess or OpenProcess. To convert from pages to bytes, get the system page size by calling either GetPerformanceInfo or GetSystemInfo.

The downside to this approach is that you'll need PROCESS_VM_READ and PROCESS_QUERY_INFORMATION access to the process. If the current user is an elevated administrator, typically enabling SeDebugPrivilege gets around the access check, except not for 'protected' processes.

from ctypes import *
from ctypes.wintypes import *
from collections import namedtuple

__all__ = ['query_working_set', 'working_set_size']

kernel32 = WinDLL('kernel32', use_last_error=True)
psapi = WinDLL('psapi', use_last_error=True)

PROCESS_VM_READ           = 0x0010
PROCESS_QUERY_INFORMATION = 0x0400

ERROR_ACCESS_DENIED = 0x0005
ERROR_BAD_LENGTH    = 0x0018

ULONG_PTR = WPARAM
SIZE_T = c_size_t

class PSAPI_WORKING_SET_BLOCK(Union):
    class _FLAGS(Structure):
        _fields_ = (('Protection',  ULONG_PTR,  5),
                    ('ShareCount',  ULONG_PTR,  3),
                    ('Shared',      ULONG_PTR,  1),
                    ('Reserved',    ULONG_PTR,  3),
                    ('VirtualPage', ULONG_PTR, 20))
    _anonymous_ = '_flags',
    _fields_ = (('Flags', ULONG_PTR),
                ('_flags', _FLAGS))

class PSAPI_WORKING_SET_INFORMATION(Structure):
    _fields_ = (('NumberOfEntries',  ULONG_PTR),
                ('_WorkingSetInfo', PSAPI_WORKING_SET_BLOCK * 1))
    @property
    def WorkingSetInfo(self):
        array_t = PSAPI_WORKING_SET_BLOCK * self.NumberOfEntries
        offset = PSAPI_WORKING_SET_INFORMATION._WorkingSetInfo.offset
        return array_t.from_buffer(self, offset)

PPSAPI_WORKING_SET_INFORMATION = POINTER(PSAPI_WORKING_SET_INFORMATION)

def errcheck_bool(result, func, args):
    if not result:
        raise WinError(get_last_error())
    return args

psapi.QueryWorkingSet.errcheck = errcheck_bool
psapi.QueryWorkingSet.argtypes = (
    HANDLE,                         # _In_  hProcess
    PPSAPI_WORKING_SET_INFORMATION, # _Out_ pv
    DWORD)                          # _In_  cb

kernel32.GetCurrentProcess.restype = HANDLE

kernel32.OpenProcess.errcheck = errcheck_bool
kernel32.OpenProcess.restype = HANDLE
kernel32.OpenProcess.argtypes = (
    DWORD, # _In_ dwDesiredAccess
    BOOL,  # _In_ bInheritHandle
    DWORD) # _In_ dwProcessId

def query_working_set(pid=None):
    """Return the PSAPI_WORKING_SET_BLOCK array for the target process."""
    if pid is None:
        hprocess = kernel32.GetCurrentProcess()
    else:
        access = PROCESS_VM_READ | PROCESS_QUERY_INFORMATION
        hprocess = kernel32.OpenProcess(access, False, pid)
    info = PSAPI_WORKING_SET_INFORMATION()
    base_size = sizeof(info)
    item_size = sizeof(PSAPI_WORKING_SET_BLOCK)
    overshoot = 0
    while True:
        overshoot += 4096
        n = info.NumberOfEntries + overshoot
        resize(info, base_size + n * item_size)
        try:
            psapi.QueryWorkingSet(hprocess, byref(info), sizeof(info))
            break
        except OSError as e:
            if e.winerror != ERROR_BAD_LENGTH:
                raise
    return info.WorkingSetInfo

class PERFORMANCE_INFORMATION(Structure):
    _fields_ = (('cb',                DWORD),
                ('CommitTotal',       SIZE_T),
                ('CommitLimit',       SIZE_T),
                ('CommitPeak',        SIZE_T),
                ('PhysicalTotal',     SIZE_T),
                ('PhysicalAvailable', SIZE_T),
                ('SystemCache',       SIZE_T),
                ('KernelTotal',       SIZE_T),
                ('KernelPaged',       SIZE_T),
                ('KernelNonpaged',    SIZE_T),
                ('PageSize',          SIZE_T),
                ('HandleCount',       DWORD),
                ('ProcessCount',      DWORD),
                ('ThreadCount',       DWORD))
    def __init__(self, *args, **kwds):
        super(PERFORMANCE_INFORMATION, self).__init__(*args, **kwds)
        self.cb = sizeof(self)

PPERFORMANCE_INFORMATION = POINTER(PERFORMANCE_INFORMATION)

psapi.GetPerformanceInfo.errcheck = errcheck_bool
psapi.GetPerformanceInfo.argtypes = (
    PPERFORMANCE_INFORMATION, # _Out_ pPerformanceInformation
    DWORD)                    # _In_  cb

WorkingSetSize = namedtuple('WorkingSetSize', 'total shared private')

def working_set_size(pid=None):
    """Return the total, shared, and private working set sizes
       for the target process.
    """
    wset = query_working_set(pid)
    pinfo = PERFORMANCE_INFORMATION()
    psapi.GetPerformanceInfo(byref(pinfo), sizeof(pinfo))
    pagesize = pinfo.PageSize        
    total = len(wset) * pagesize
    shared = sum(b.Shared for b in wset) * pagesize
    private = total - shared
    return WorkingSetSize(total, shared, private)

if __name__ == '__main__':
    import sys    
    pid = int(sys.argv[1]) if len(sys.argv) > 1 else None
    try:
        total, shared, private = working_set_size(pid)
    except OSError as e:
        if e.winerror == ERROR_ACCESS_DENIED:
            sys.exit('Access Denied')
        raise
    width = len(str(total))
    print('Working Set: %*d' % (width, total))
    print('     Shared: %*d' % (width, shared))
    print('    Private: %*d' % (width, private))

For example:

C:\>tasklist /fi "imagename eq explorer.exe"

Image Name                     PID Session Name        Session#    Mem Usage
========================= ======== ================ =========== ============
explorer.exe                  2688 Console                    1     66,048 K

C:\>workingset.py 2688
Working Set: 67465216
     Shared: 59142144
    Private:  8323072

The following demonstrates getting denied access to a system process, even as an administrator. Usually enabling SeDebugPrivilege gets around this (note the privilege has to be present in the process token in order to enable it; you can't just add privileges to a token). Showing how to enable and disable privileges in an access token is beyond the scope of this answer, but below I demonstrate that it does work, at least for non-protected processes.

C:\>tasklist /fi "imagename eq winlogon.exe"

Image Name                     PID Session Name        Session#    Mem Usage
========================= ======== ================ =========== ============
winlogon.exe                   496 Console                    1      8,528 K

C:\>workingset.py 496
Access Denied

C:\>python
>>> from workingset import *
>>> from privilege import enable_privilege
>>> enable_privilege('SeDebugPrivilege')
>>> working_set_size(496)
WorkingSetSize(total=8732672, shared=8716288, private=16384)

Upvotes: 4

Related Questions