Module errgrep.non_blocking_read_thread_test

Expand source code
import io

from .non_blocking_read_thread import NonBlockingReadThread

def test_non_blocking_read_thread(tmp_path):
    file_path = tmp_path / 'tmp'
    with open(file_path, 'w') as f:
        f.write('hello\n')
        f.write('world\n')

    t = NonBlockingReadThread(open(file_path, 'r'))
    t.start()

    # shouldn't die since we didn't empty the queue
    assert t.is_alive()

    line = t.lines_queue.get()
    assert line == 'hello'

    line = t.lines_queue.get()
    assert line == 'world'

    assert t.lines_queue.empty()

    # now its empty and can die
    t.join(1)
    assert not t.is_alive()

Functions

def test_non_blocking_read_thread(tmp_path)
Expand source code
def test_non_blocking_read_thread(tmp_path):
    file_path = tmp_path / 'tmp'
    with open(file_path, 'w') as f:
        f.write('hello\n')
        f.write('world\n')

    t = NonBlockingReadThread(open(file_path, 'r'))
    t.start()

    # shouldn't die since we didn't empty the queue
    assert t.is_alive()

    line = t.lines_queue.get()
    assert line == 'hello'

    line = t.lines_queue.get()
    assert line == 'world'

    assert t.lines_queue.empty()

    # now its empty and can die
    t.join(1)
    assert not t.is_alive()