Skip to content Skip to sidebar Skip to footer

Reading A Binary File Into A Struct

I have a binary file with a known format/structure. How do I read all the binary data in to an array of the structure? Something like (in pseudo code) bytes = read_file(filename) s

Solution 1:

Actually it looks like you're trying to read a list (or array) of structures from the file. The idiomatic way to do this in Python is use the struct module and call struct.unpack() in a loop—either a fixed number of times if you know the number of them in advance, or until end-of-file is reached—and store the results in a list. Here's an example of the latter:

import struct

struct_fmt = '=5if255s'# int[5], float, byte[255]
struct_len = struct.calcsize(struct_fmt)
struct_unpack = struct.Struct(struct_fmt).unpack_from

results = []
withopen(filename, "rb") as f:
    while True:
        data = f.read(struct_len)
        ifnot data: break
        s = struct_unpack(data)
        results.append(s)

The same results can be also obtained slightly more concisely using a list comprehension along with a short generator function helper (i.e. read_chunks() below):

defread_chunks(f, length):
    whileTrue:
        data = f.read(length)
        ifnot data: breakyield data

withopen(filename, "rb") as f:
    results = [struct_unpack(chunk) for chunk in read_chunks(f, struct_len)]

Update

You don't, in fact, need to explicitly define a helper function as shown above because you can use Python's built-in iter() function to dynamically create the needed iterator object in the list comprehension itself like so:

from functools import partial

withopen(filename, "rb") as f:
    results = [struct_unpack(chunk) for chunk initer(partial(f.read, struct_len), b'')]

Solution 2:

Use the struct module; you need to define the types in a string format documented with that library:

struct.unpack('=HHf255s', bytes)

The above example expects native byte-order, two unsigned shorts, a float and a string of 255 characters.

To loop over an already fully read bytes string, I'd use itertools; there is a handy grouper recipe that I've adapted here:

from itertools import izip_longest, imap
from struct import unpack, calcsize

fmt_s = '=5i'
fmt_spec = '=256i'
size_s = calcsize(fmt_s)
size = size_s + calcsize(fmt_spec)

defchunked(iterable, n, fillvalue=''):
    args = [iter(iterable)] * n
    return imap(''.join, izip_longest(*args, fillvalue=fillvalue))

data = [unpack(fmt_s, section[:size_s]) + (unpack(fmt_spec, section[size_s:]),)
    for section in chunked(bytes, size)]
    

This produces tuples rather than lists, but it's easy enough to adjust if you have to:

data = [list(unpack(fmt_s, section[:size_s])) + [list(unpack(fmt_spec, section[size_s:]))]
    for section in chunked(bytes, size)]

Solution 3:

Add comments

importstruct

First just read the binary into an array

mbr = file('mbrcontent', 'rb').read() 

So you can just fetch some piece of the the array

partition_table = mbr[446:510] 

and then unpack it as an integer

signature = struct.unpack('<H', mbr[510:512])[0] 

a more complex example

little_endian = (signature == 0xaa55) # should be True print"Little endian:", little_endian 
PART_FMT = (little_endian and'<'or'>') + ( 
"B"# status (0x80 = bootable (active), 0x00 = non-bootable) # CHS of first block "B"# Head "B"# Sector is in bits 5; bits 9 of cylinder are in bits 7-6 "B"# bits 7-0 of cylinder "B"# partition type # CHS of last block "B"# Head "B"# Sector is in bits 5; bits 9 of cylinder are in bits 7-6 "B"# bits 7-0 of cylinder "L"# LBA of first sector in the partition "L"# number of blocks in partition, in little-endian format 
) 

PART_SIZE = 16 
fmt_size = struct.calcsize(PART_FMT) 
# sanity check expectations 
assert fmt_size == PART_SIZE, "Partition format string is %i bytes, not %i" % (fmt_size, PART_SIZE) 

def cyl_sector(sector_cyl, cylinder7_0): 
    sector = sector_cyl & 0x1F# bits 5-0 # bits 7-6 of sector_cyl contain bits 9-8 of the cylinder 
    cyl_high = (sector_cyl >> 5) & 0x03 
    cyl = (cyl_high << 8) | cylinder7_0return sector, cyl 

#I have corrected the indentation, but the change is refused because less than 6 characters, so I am adding this useful comment.for partition in range(4): 
    print"Partition #%i" % partition, 
    offset = PART_SIZE * partition 
    (status, start_head, start_sector_cyl, start_cyl7_0, part_type, end_head, end_sector_cyl, end_cyl7_0, 
    lba, blocks ) = struct.unpack( PART_FMT,partition_table[offset:offset + PART_SIZE]) 
    if status == 0x80: 
        print"Bootable", 
    elif status: 
        print"Unknown status [%s]" % hex(status), 
        print"Type=0x%x" % part_type 
        start = (start_head,) + cyl_sector(start_sector_cyl, start_cyl7_0) 
        end = (end_head,) + cyl_sector(end_sector_cyl, end_cyl7_0) 
        print" (Start: Heads:%i\tCyl:%i\tSect:%i)" % start 
        print" (End: Heads:%i\tCyl:%i\tSect:%i)" % end 
        print" LBA:", lba 
        print" Blocks:", blocks 

Solution 4:

import os, re
import functools
import ctypes
from ctypes import string_at, byref, sizeof, cast, POINTER, pointer, create_string_buffer, memmove
import numpy as np
import pandas as pd

class_StructBase(ctypes.Structure):
    __type__ = 0
    _fields_ = []

    @classmethoddefOffsetof(cls, field):
        pattern = '(?P<field>\w+)\[(?P<idx>\d+)\]'

        mat = re.match(pattern, field)
        if mat:
            fields = dict(cls.Fields())
            f = mat.groupdict()['field']
            idx = mat.groupdict()['idx']
            return cls.Offsetof(f) + int(idx) * ctypes.sizeof(fields[field])
        else:
            returngetattr(cls, field).offset

    @classmethoddefDType(cls):
        map = {
            ctypes.c_byte: np.byte,
            ctypes.c_ubyte: np.ubyte,
            ctypes.c_char: np.ubyte,

            ctypes.c_int8: np.int8,
            ctypes.c_int16: np.int16,
            ctypes.c_int32: np.int32,
            ctypes.c_int64: np.int64,

            ctypes.c_uint8: np.uint8,
            ctypes.c_uint16: np.uint16,
            ctypes.c_uint32: np.uint32,
            ctypes.c_uint64: np.uint64,

            ctypes.c_float: np.float32,
            ctypes.c_double: np.float64,
        }
        res = []

        for k, v in cls.Fields():
            ifhasattr(v, '_length_'):
                if v._type_ != ctypes.c_char:
                    for i inrange(v._length):
                        res.append((k, map[v], cls.Offsetof(k)))
                else:
                    res.append((k, 'S%d' % v._length_, cls.Offsetof(k)))
            else:
                res.append((k, map[v], cls.Offsetof(k)))
        res = pd.DataFrame(res, columns=['name', 'format', 'offset'])
        return np.dtype({
            'names': res['name'],
            'formats': res['format'],
            'offsets': res['offset'],
        })

    @classmethoddefAttr(cls):
        fields = cls._fields_
        res = []
        for attr, tp in fields:
            ifstr(tp).find('_Array_') > 0andstr(tp).find('char_Array_') < 0:
                for i inrange(tp._length_):
                    res.append((attr + '[%s]' % str(i), tp._type_))
            else:
                res.append((attr, tp))
        return res

    @classmethoddefFields(cls, notype=False):
        res = [cls.Attr()]
        cur_cls = cls
        whileTrue:
            cur_cls = cur_cls.__bases__[0]
            if cur_cls == ctypes.Structure:
                break
            res.append(cur_cls.Attr())
        if notype:
            return [k for k, v in functools.reduce(list.__add__, reversed(res), [])]
        else:
            return functools.reduce(list.__add__, reversed(res), [])

    @classmethoddefsize(cls):
        return sizeof(cls)

    @classmethoddeffrom_struct_binary(cls, path, max_count=2 ** 32, decode=True):
        print(os.path.getsize(path), cls.size())
        assert os.path.getsize(path) % cls.size() == 0
        size = os.path.getsize(path) // cls.size()
        size = min(size, max_count)

        index = range(size)
        array = np.fromfile(path, dtype=cls.DType(), count=size)

        df = pd.DataFrame(array, index=index)
        for attr, tp ineval(str(cls.DType())):
            if re.match('S\d+', tp) isnotNoneand decode:
                try:
                    df[attr] = df[attr].map(lambda x: x.decode("utf-8"))
                except:
                    df[attr] = df[attr].map(lambda x: x.decode("gbk"))
        return df

classStructBase(_StructBase):
    _fields_ = [
        ('Type', ctypes.c_uint32),
    ]

classIndexStruct(StructBase):
    _fields_ = [
        ('Seq', ctypes.c_uint32),
        ('ExID', ctypes.c_char * 8),
        ('SecID', ctypes.c_char * 8),
        ('SecName', ctypes.c_char * 16),
        ('SourceID', ctypes.c_int32),
        ('Time', ctypes.c_uint32),
        ('PreClose', ctypes.c_uint32),
        ('Open', ctypes.c_uint32),
        ('High', ctypes.c_uint32),
        ('Low', ctypes.c_uint32),
        ('Match', ctypes.c_uint32),
    ]

df = IndexStruct.from_struct_binary('your path')
print(df)

Post a Comment for "Reading A Binary File Into A Struct"