Run Class methods in threads (python) Run Class methods in threads (python) multithreading multithreading

Run Class methods in threads (python)


If you call them from the class, it is as simple as:

import threadingclass DomainOperations:    def __init__(self):        self.domain_ip = ''        self.website_thumbnail = ''    def resolve_domain(self):        self.domain_ip = 'foo'    def generate_website_thumbnail(self):        self.website_thumbnail= 'bar'    def run(self):        t1 = threading.Thread(target=self.resolve_domain)        t2 = threading.Thread(target=self.generate_website_thumbnail)        t1.start()        t2.start()        t1.join()        t2.join()        print(self.domain_ip, self.website_thumbnail)if __name__ == '__main__':    d = DomainOperations()    d.run()


You can inherit Thread class in DomainOperation, in this way code would be more clean and easily understandable. you have to override a run() method.

from threading import Threadclass DomainOperations(Thread):    def __init__(self, *args, **kwargs):       super().__init__(*args, **kwargs)       self.domain_ip = ''       self.website_thumbnail = ''   def resolve_domain(self):       self.domain_ip = 'foo'   def generate_website_thumbnail(self):       self.website_thumbnail= 'bar'   def run(self):       #domain will be resolved on first thread       self.resolve_domain()       #thumbnail will be resolved on second OR newly created below thread       thread2 = Thread(target=self.generate_website_thumbnail)       thread.start()       # thread1 will wait for thread2       self.join()       # thread2 will wait for thread1, if it's late.       thread2.join()       # here it will print ip and thumbnail before exiting first thread       print(self.domain_ip, self.website_thumbnail)

And you will start your threads in this way.

if __name__ == '__main__':   thread1 = DomainOperations()   thread1.start()


def post_test(tbid, line_num, response_time):    """    :param tbid: 参数id    :return:    """    # 请求参数    data = {'tbId': tbid, 'conditions': [{"key": "", "type": 1}], 'pageNum': 1, 'pageSize': 12}    # 请求启动时间    start = time.time()    # post请求    r = requests.post(url=url, data=json.dumps(data), headers=headers)    # 请求结束时间    end = time.time()    # 保留两位小数    finall_time = float('%.2f' % float(end - start))    text = json.loads(r.text)    # IO写入 只写入200的    with open('text6.csv', 'a', newline='') as csvfile:       if text['statusCode'] == '200':        throughput = line_num * response_time / finall_time        throughput = float('%.2f' % float(throughput))        print('the perf_counter time of %s is %s and the content is %s ,throughput is %s' % (            tbid, finall_time, json.loads(r.text), throughput))        spamwriter = csv.writer(csvfile, dialect='excel')        spamwriter.writerow([tbid] + [finall_time] + [throughput])def start_thread(csv_name):  tbid, response_time_sort, throughput_sort = read_csv(csv_name)  print(tbid)  line_num = len(tbid)  response_times = 5  for j in range(response_times):    for i in tbid:        t = threading.Thread(target=post_test, args=(i, line_num, response_times))        t.start()        t.join()

I don't know how to call a method in a class, especially if it has initialization parameters, but you can try this method。 I'm trying to use multiple processes to solve this problem, right。