Hongbo Miao
Hongbo Miao

Reputation: 49804

subscribe not work after upgrading RxPy from 1.x to 3.x

I am using Python 3.7.3.

I try to upgrade RxPy from 1.6.1 (1.x) to 3.0.0a3 (3.x).

Old code using RxPy 1.x

from rx import Observable
import psutil
import numpy as np
import pylab as plt

cpu_data = (Observable
            .interval(100)  # Each 100 milliseconds
            .map(lambda x: psutil.cpu_percent())
            .publish())
cpu_data.connect()


def monitor_cpu(npoints):
    lines, = plt.plot([], [])
    plt.xlim(0, npoints)
    plt.ylim(0, 100)

    cpu_data_window = cpu_data.buffer_with_count(npoints, 1)

    def update_plot(cpu_readings):
        lines.set_xdata(np.arange(len(cpu_readings)))
        lines.set_ydata(np.array(cpu_readings))
        plt.draw()

    alertpoints = 4
    high_cpu = (cpu_data
                .buffer_with_count(alertpoints, 1)
                .map(lambda readings: all(r > 20 for r in readings)))

    label = plt.text(1, 1, "normal")

    def update_warning(is_high):
        if is_high:
            label.set_text("high")
        else:
            label.set_text("normal")

    high_cpu.subscribe(update_warning)
    cpu_data_window.subscribe(update_plot)

    plt.show()


if __name__ == '__main__':
    monitor_cpu(10)

If you run the code you can see a real-time CPU monitor chart.

enter image description here

However, after I installed the new RxPy by

pip3 install --pre rx

with new code below, it only shows white one without any dynamic chart.

And the function update_plot actually never ran. Any idea?

enter image description here

New code using RxPy 3.x

from rx import interval, operators as op
import psutil
import numpy as np
import pylab as plt


cpu_data = interval(100).pipe(  # Each 100 milliseconds
    op.map(lambda x: psutil.cpu_percent()),
    op.publish())
cpu_data.connect()


def monitor_cpu(npoints):
    lines, = plt.plot([], [])
    plt.xlim(0, npoints)
    plt.ylim(0, 100)

    cpu_data_window = cpu_data.pipe(
        op.buffer_with_count(npoints, 1))

    def update_plot(cpu_readings):
        print('update')  # here never runs
        lines.set_xdata(np.arange(len(cpu_readings)))
        lines.set_ydata(np.array(cpu_readings))
        plt.draw()

    alertpoints = 4
    high_cpu = cpu_data.pipe(
                op.buffer_with_count(alertpoints, 1),
                op.map(lambda readings: all(r > 20 for r in readings)))

    label = plt.text(1, 1, "normal")

    def update_warning(is_high):
        if is_high:
            label.set_text("high")
        else:
            label.set_text("normal")

    high_cpu.subscribe(update_warning)
    cpu_data_window.subscribe(update_plot)

    plt.show()


if __name__ == '__main__':
    monitor_cpu(10)

Upvotes: 0

Views: 351

Answers (1)

Diego Torres Milano
Diego Torres Milano

Reputation: 69218

Time units are now in seconds

cpu_data = interval(0.1).pipe(  # Each 100 milliseconds
    op.map(lambda x: psutil.cpu_percent()),
    op.publish())
cpu_data.connect()

Upvotes: 1

Related Questions