diff --git a/Pipfile b/Pipfile index 8c9f6671..8e3f28b9 100644 --- a/Pipfile +++ b/Pipfile @@ -8,13 +8,15 @@ name = "pypi" Cython = "*" # matham's next-gen async port of kivy Kivy = {git = "git://github.com/matham/kivy.git", ref = "async-loop"} +tractor = {git = "git://github.com/tgoodlet/tractor.git", ref = "master"} pdbpp = "*" msgpack = "*" +trio = "*" [dev-packages] pytest = "*" pdbpp = "*" -"e1839a8" = {path = ".", editable = true} +piker = {editable = true, path = "."} [requires] python_version = "3.6" diff --git a/Pipfile.lock b/Pipfile.lock index 7eec1f27..b948d859 100644 --- a/Pipfile.lock +++ b/Pipfile.lock @@ -1,7 +1,7 @@ { "_meta": { "hash": { - "sha256": "8d6521c10626550c727da281b89d0b6df5d31eb7dc5fe6e7aadf3de58bae44a7" + "sha256": "3ecdccfdf0abbb69730204c608319fcd7b151115a0e183dae6cb2c0748beeb58" }, "pipfile-spec": 6, "requires": { @@ -59,40 +59,37 @@ }, "cython": { "hashes": [ - "sha256:0344e9352b0915910e212c38403b63f902ce1cba75dde7a43a9112ff960eb2a5", - "sha256:0a390c39e912fc5f82d5feae2d16ea061971407099e1efb0fecb255cb96fbeff", - "sha256:0f2b2e09f94c498f555935e732b7321b5f62f00e7a789238f6c5ddd66987a54d", - "sha256:15614592616b6dd5e919e158796350ebeba6cb6b5d2998cfff41b53f568c8355", - "sha256:1aae6d6e9858888144cea147eb5e677830f45faaff3d305d77378c3cba55f526", - "sha256:200583297f23e558744bc4688d8a2b2605ab6ad7d1494a9fd8c8094ad65ebf3c", - "sha256:295facc211a6b55db9979455b856180f2839be22ab767ffdea55986bee83ca9f", - "sha256:36c16bf39280fe857213d8da31c07a6179d3878c3dc2e435dce0974b9f8f0729", - "sha256:3fef8dfa9cf86ab7814ca31369374ddd5b9524f54406aa83b53b5937965b8e88", - "sha256:439d233d3214e3d69c033a9a93516758f2c8a03e83ea51ae14b6eed13687d224", - "sha256:455ab39c6c0849a6c008fcdf2fae42475f18d0801a3be229e8f75367bbe3b325", - "sha256:56821e3791209e6a11992e294afbf7e3dcda7d4fd54d06396dd521928d3d14fe", - "sha256:62b594584889b33bbea7e71f9d7c5c6539091b341334ef7ca1ae7e30a9dd3e15", - "sha256:70f81a75fb25c1c3c61843e3a6fe771a76c4ebf4d154455a7eff0740ad47dff4", - "sha256:8011090beb09251cb4ece1e14263e574b38eda696b788552b369ad343373d0e9", - "sha256:80d6a0369333a162fc32a22637f5870f3e87fb038c7b58860bbe00b05b58aa62", - "sha256:85b04e32af58a3c008c0ba8169017770aaa342a5972b748f81d043d66363e437", - "sha256:9c4db4cfc8ac219b50484a505e3327142db04d5e9954aaed00ab729ef4beb142", - "sha256:9ed273d82116fa148c92901b9639030e087979d455982bd7bf727fb486c0bd17", - "sha256:a1af59e6c9b4acc07c429d8495fc016a35e0a1270f28c57317352f512df7e214", - "sha256:b894ff4daf8dfaf657bf2d5e7190a4de11b2400b1e0fb0902974d35c23a26dea", - "sha256:c2659981150b4de04397dcfd4bff64e384d3ba25af60d1b22820fdf108298cb2", - "sha256:c347d0a129c9742fefeaecf2455576c5ae73362aa01a27cea26fac237b7e2a87", - "sha256:c981a750858f1727995acf861ab030b267d264ca6efda2f01104941187a3675f", - "sha256:cc4152b19ec168391f7815d24b70c8911829ba281bd5fcd98cab9dc21abe62ff", - "sha256:d0f5b1668e7f7f6fc9849f49a20c5db10562a0ab29cd66818894dfebbca7b304", - "sha256:d7152006ed1a3adb8f978077b57d237ddafa188240af53cd72b5c79e4ed000e3", - "sha256:e5f877472993474296125c22b84c334b550010815e513cccce73da854a132d64", - "sha256:e7c2c87ff2f99ed4be1bb046d6eddfb388af627928037f9e0a420c05daaf14ed", - "sha256:edd7d499685655031be5b4d33005096b6345f81eeb7ab9d2dd415db0c7bcf64e", - "sha256:f99a777fda569a88deea863eac2722b5e88957c4d5f4413949740da791857ac9" + "sha256:01487236575df8f17b46982071438dce4f7eaf8acc8fb99fca3510d343cd7a28", + "sha256:0671d17c7a27634d6819246e535241b951141ed0e3f6f2a6d618fd32344dae3e", + "sha256:0e6190d6971c46729f712dd7307a9c0a8c027bfa5b4d8f2edef106b01759926c", + "sha256:202587c754901d0678bd6ff89c707f099987928239049a528470c06c6c922cf8", + "sha256:345197ba9278cf6a914cb7421dc665a0531a219b0072abf6b0cebfdf68e75725", + "sha256:3a296b8d6b02f0e01ab04bedea658f43eef5ad2f8e586a820226ead1a677d9b1", + "sha256:484572a2b22823a967be106137a93f7d634db116b3f7accb37dbd760eda2fa9f", + "sha256:4c67c9c803e50ceff32cc5e4769c50fc8ae8df9c4e5cc592ce8310b5a1076d23", + "sha256:539038087c321911745fc2e77049209b1231300d481cb4d682b2f95c724814b3", + "sha256:58113e0683c3688594c112103d7e9f2d0092fd2d8297a220240bea22e184dfdd", + "sha256:65cb25ca4284804293a2404d1be3b5a98818be21a72791649bacbcfa4e431d41", + "sha256:699e765da2580e34b08473fc0acef3a2d7bcb7f13eb29401cd25236bcf000080", + "sha256:6b54c3470810cea49a8be90814d05c5325ceb9c5bf429fd86c36fc1b32dfc157", + "sha256:71ac1629e4eae2ed329be8caf45efea10bfe1af3d8767e12e64b83e4ea5a3250", + "sha256:722c179d3df8677f3daf45b1a2764678ed4f0aaddbaa7211a8a08ebfd907c0db", + "sha256:76ac2b08d3d956d77b574bb43cbf1d37bd58b9d50c04ba281303e695854ebc46", + "sha256:7eff1157be9e26bf7494288c89979ca69d593a009e2c7420a739e2cf1e0635f5", + "sha256:99546c8696d27d0efa639c77b2f8af6e61dc3a5073caae4f27ffd991ca926f42", + "sha256:a0c263b31d335f29c11f4a9e98fbcd908d0731d4ea99bfd27c1c47caaeb4ca2e", + "sha256:a29c66292605bff962adc26530c030607aa699206b12dfb84f131b0454e15df4", + "sha256:a4d3724c5a1ddd86d7d830d8e02c40151839b833791dd4b6fe9e144380fa7d37", + "sha256:aed9f33b19d542eea56c38ef3862ca56147f7903648156cd57eabb0fe47c35d6", + "sha256:b57e733dd8871d2cc7358c2e0fe33027453afffbcd0ea6a537f54877cad5131c", + "sha256:d5bf4db62236e82955c40bafbaa18d54b20b5ceefa06fb57c7facc443929f4bd", + "sha256:d9272dd71ab78e87fa34a0a59bbd6acc9a9c0005c834a6fc8457ff9619dc6795", + "sha256:e9d5671bcbb90a41b0832fcb3872fcbaca3d68ff11ea09724dd6cbdf31d947fb", + "sha256:ee54646afb2b73b293c94cf079682d18d404ebd6c01122dc3980f111aec2d8ae", + "sha256:f16a87197939977824609005b73f9ebb291b9653a14e5f27afc1c5d6f981ba39" ], "index": "pypi", - "version": "==0.28.3" + "version": "==0.28.4" }, "e1839a8": { "editable": true, @@ -173,45 +170,54 @@ "sha256:2df854df882d322d5c23087a4959e145b953dfff2abe1774fec4f639ac2f3160", "sha256:381ad13c30cd1d0b2f3da8a0c1a4aa697487e8bb0e9e0cbeb7439776bcb645f8", "sha256:385f1ce46e08676505b692bfde918c1e0b350963a15ef52d77691c2cf0f5dbf6", + "sha256:4130e5ae16c656b7de654dc5e595cfeb85d3a4b0bb0734d19c0dce6dc7ee0e07", "sha256:4d278c2261be6423c5e63d8f0ceb1b0c6db3ff83f2906f4b860db6ae99ca1bb5", "sha256:51c5dcb51cf88b34b7d04c15f600b07c6ccbb73a089a38af2ab83c02862318da", "sha256:589336ba5199c8061239cf446ee2f2f1fcc0c68e8531ee1382b6fc0c66b2d388", + "sha256:5ae3564cb630e155a650f4f9c054589848e97836bebae5637240a0d8099f817b", "sha256:5edf1acc827ed139086af95ce4449b7b664f57a8c29eb755411a634be280d9f2", "sha256:6b82b81c6b3b70ed40bc6d0b71222ebfcd6b6c04a6e7945a936e514b9113d5a3", "sha256:6c57f973218b776195d0356e556ec932698f3a563e2f640cfca7020086383f50", "sha256:758d1091a501fd2d75034e55e7e98bfd1370dc089160845c242db1c760d944d9", "sha256:8622db292b766719810e0cb0f62ef6141e15fe32b04e4eb2959888319e59336b", "sha256:8b8dcfcd630f1981f0f1e3846fae883376762a0c1b472baa35b145b911683b7b", + "sha256:91fdd510743ae4df862dbd51a4354519dd9fb8941347526cd9c2194b792b3da9", "sha256:97fa8f1dceffab782069b291e38c4c2227f255cdac5f1e3346666931df87373e", + "sha256:9b705f18b26fb551366ab6347ba9941b62272bf71c6bbcadcd8af94d10535241", "sha256:9d69967673ab7b028c2df09cae05ba56bf4e39e3cb04ebe452b6035c3b49848e", "sha256:9e1f53afae865cc32459ad211493cf9e2a3651a7295b7a38654ef3d123808996", "sha256:a4a433b3a264dbc9aa9c7c241e87c0358a503ea6394f8737df1683c7c9a102ac", "sha256:baadc5f770917ada556afb7651a68176559f4dca5f4b2d0947cd15b9fb84fb51", "sha256:c725d11990a9243e6ceffe0ab25a07c46c1cc2c5dc55e305717b5afe856c9608", "sha256:d696a8c87315a83983fc59dd27efe034292b9e8ad667aeae51a68b4be14690d9", - "sha256:e1864a4e9f93ddb2dc6b62ccc2ec1f8250ff4ac0d3d7a15c8985dd4e1fbd6418" + "sha256:e1864a4e9f93ddb2dc6b62ccc2ec1f8250ff4ac0d3d7a15c8985dd4e1fbd6418", + "sha256:e1d18421a7e2ad4a655b76e65d549d4159f8874c18a417464c1d439ee7ccc7cd" ], "version": "==1.14.5" }, "pandas": { "hashes": [ - "sha256:211cfdb9f72f26d2ede21c751d27e08fed4434d47fb9bb82ebc8ff753888b8b6", - "sha256:28fd087514616549a0e3259cd68ac88d7eaed6bd3062017a7f312e27941266bd", - "sha256:2fb7c63138bd5ead296b18b2cb6abd3a394f7581e5ae052b02b27df8244b03ca", - "sha256:372435456c349a8d39ff001967b161f6bd29d4c3de145a4cf9b366648defbb1f", - "sha256:3790a3348ab0f416e58061d21693cb662fbb2f638001b94bf2b2199fedc1b1c2", - "sha256:437a6e906a6717a9ed2627cf6e7895b63dfaa0172567cbd75a553f55cf78cc17", - "sha256:50b52af2af2e15f4aeb2fe196da073a8c131fa02e433e105d95ce40016df5690", - "sha256:720daad75b5d35dd1b446842210c4f3fd447464c9c0884972f3f12b213a9edd1", - "sha256:b4fb71acbc2709b8f5993cb4b5445d8182864f11c39787e317aae39f21206270", - "sha256:b704fd73022342cce612996de495a16954311e0c0cf077c1b83d5cf0b9656a60", - "sha256:cbbecca0c7af6a2160b2d6ba30becc286824a98c61dcc6a41fada664f226424c", - "sha256:d2a071de755cc8ee7784e1b4c7b9b643d951d35c8adea7d64fe7c57cff9c47a7", - "sha256:d8154c5c68713a82461aba735832f0b4692be8a45a0a340a303bf90d6f80f36f", - "sha256:e1b86f7c55467ce1f6c12715f2fd1817f4a909b5c8c39bd4b5d2415ef2b04bd8", - "sha256:fcc63e8134516e93e16eb4ceac9afaa51f4adc5bf58efddae7cbc562f5b77dd0" + "sha256:05ac350f8a35abe6a02054f8cf54e0c048f13423b2acb87d018845afd736f0b4", + "sha256:174543cd68eaee60620146b38faaed950071f5665e0a4fa4adfdcfc23d7f7936", + "sha256:1a62a237fb7223c11d09daaeaf7d15f234bb836bfaf3d4f85746cdf9b2582f99", + "sha256:2c1ed1de5308918a7c6833df6db75a19c416c122921824e306c64a0626b3606c", + "sha256:33825ad26ce411d6526f903b3d02c0edf627223af59cf4b5876aa925578eec74", + "sha256:4c5f76fce8a4851f65374ea1d95ca24e9439540550e41e556c0879379517a6f5", + "sha256:67504a96f72fb4d7f051cfe77b9a7bb0d094c4e2e5a6efb2769eb80f36e6b309", + "sha256:683e0cc8c7faececbbc06aa4735709a07abad106099f165730c1015da916adec", + "sha256:77cd1b485c6a860b950ab3a85be7b5683eaacbc51cadf096db967886607d2231", + "sha256:814f8785f1ab412a7e9b9a8abb81dfe8727ebdeef850ecfaa262c04b1664000f", + "sha256:894216edaf7dd0a92623cdad423bbec2a23fc06eb9c85483e21876d1ef8f47e9", + "sha256:9331e20a07360b81d8c7b4b50223da387d264151d533a5a5853325800e6631a4", + "sha256:9cd3614b4e31a0889388ff1bd19ae857ad52658b33f776065793c293a29cf612", + "sha256:9d79e958adcd037eba3debbb66222804171197c0f5cd462315d1356aa72a5a30", + "sha256:b90e5d5460f23607310cbd1688a7517c96ce7b284095a48340d249dfc429172e", + "sha256:bc80c13ffddc7e269b706ed58002cc4c98cc135c36d827c99fb5ca54ced0eb7a", + "sha256:cbb074efb2a5e4956b261a670bfc2126b0ccfbf5b96b6ed021bc8c8cb56cf4a8", + "sha256:e8c62ab16feeda84d4732c42b7b67d7a89ad89df7e99efed80ea017bdc472f26", + "sha256:ff5ef271805fe877fe0d1337b6b1861113c44c75b9badb595c713a72cd337371" ], - "version": "==0.23.1" + "version": "==0.23.3" }, "pdbpp": { "hashes": [ @@ -236,10 +242,10 @@ }, "pytz": { "hashes": [ - "sha256:65ae0c8101309c45772196b21b74c46b2e5d11b6275c45d251b150d5da334555", - "sha256:c06425302f2cf668f1bba7a0a03f3c1d34d4ebeef2c72003da308b3947c7f749" + "sha256:a061aa0a9e06881eb8b3b2b43f05b9439d6583c206d0a6c340ff72a7b6669053", + "sha256:ffb9ef1de172603304d9d2819af6f5ece76f2e85ec10692a524dd876e72bf277" ], - "version": "==2018.4" + "version": "==2018.5" }, "six": { "hashes": [ @@ -255,11 +261,16 @@ ], "version": "==2.0.4" }, + "tractor": { + "git": "git://github.com/tgoodlet/tractor.git", + "ref": "f726bd81da3a9f9f0754ffa26c92edb3c3a68608" + }, "trio": { "hashes": [ "sha256:0496f575ed118eb382346a728971766d54b3a3e39e9825e94d4d513b0fe96145", "sha256:fc5513551d22ec2be8bd05ddd56e9c3c377ac47c79a3866fa2d8710bfae4a0cb" ], + "index": "pypi", "version": "==0.4.0" }, "wmctrl": { @@ -320,44 +331,37 @@ }, "cython": { "hashes": [ - "sha256:0344e9352b0915910e212c38403b63f902ce1cba75dde7a43a9112ff960eb2a5", - "sha256:0a390c39e912fc5f82d5feae2d16ea061971407099e1efb0fecb255cb96fbeff", - "sha256:0f2b2e09f94c498f555935e732b7321b5f62f00e7a789238f6c5ddd66987a54d", - "sha256:15614592616b6dd5e919e158796350ebeba6cb6b5d2998cfff41b53f568c8355", - "sha256:1aae6d6e9858888144cea147eb5e677830f45faaff3d305d77378c3cba55f526", - "sha256:200583297f23e558744bc4688d8a2b2605ab6ad7d1494a9fd8c8094ad65ebf3c", - "sha256:295facc211a6b55db9979455b856180f2839be22ab767ffdea55986bee83ca9f", - "sha256:36c16bf39280fe857213d8da31c07a6179d3878c3dc2e435dce0974b9f8f0729", - "sha256:3fef8dfa9cf86ab7814ca31369374ddd5b9524f54406aa83b53b5937965b8e88", - "sha256:439d233d3214e3d69c033a9a93516758f2c8a03e83ea51ae14b6eed13687d224", - "sha256:455ab39c6c0849a6c008fcdf2fae42475f18d0801a3be229e8f75367bbe3b325", - "sha256:56821e3791209e6a11992e294afbf7e3dcda7d4fd54d06396dd521928d3d14fe", - "sha256:62b594584889b33bbea7e71f9d7c5c6539091b341334ef7ca1ae7e30a9dd3e15", - "sha256:70f81a75fb25c1c3c61843e3a6fe771a76c4ebf4d154455a7eff0740ad47dff4", - "sha256:8011090beb09251cb4ece1e14263e574b38eda696b788552b369ad343373d0e9", - "sha256:80d6a0369333a162fc32a22637f5870f3e87fb038c7b58860bbe00b05b58aa62", - "sha256:85b04e32af58a3c008c0ba8169017770aaa342a5972b748f81d043d66363e437", - "sha256:9c4db4cfc8ac219b50484a505e3327142db04d5e9954aaed00ab729ef4beb142", - "sha256:9ed273d82116fa148c92901b9639030e087979d455982bd7bf727fb486c0bd17", - "sha256:a1af59e6c9b4acc07c429d8495fc016a35e0a1270f28c57317352f512df7e214", - "sha256:b894ff4daf8dfaf657bf2d5e7190a4de11b2400b1e0fb0902974d35c23a26dea", - "sha256:c2659981150b4de04397dcfd4bff64e384d3ba25af60d1b22820fdf108298cb2", - "sha256:c347d0a129c9742fefeaecf2455576c5ae73362aa01a27cea26fac237b7e2a87", - "sha256:c981a750858f1727995acf861ab030b267d264ca6efda2f01104941187a3675f", - "sha256:cc4152b19ec168391f7815d24b70c8911829ba281bd5fcd98cab9dc21abe62ff", - "sha256:d0f5b1668e7f7f6fc9849f49a20c5db10562a0ab29cd66818894dfebbca7b304", - "sha256:d7152006ed1a3adb8f978077b57d237ddafa188240af53cd72b5c79e4ed000e3", - "sha256:e5f877472993474296125c22b84c334b550010815e513cccce73da854a132d64", - "sha256:e7c2c87ff2f99ed4be1bb046d6eddfb388af627928037f9e0a420c05daaf14ed", - "sha256:edd7d499685655031be5b4d33005096b6345f81eeb7ab9d2dd415db0c7bcf64e", - "sha256:f99a777fda569a88deea863eac2722b5e88957c4d5f4413949740da791857ac9" + "sha256:01487236575df8f17b46982071438dce4f7eaf8acc8fb99fca3510d343cd7a28", + "sha256:0671d17c7a27634d6819246e535241b951141ed0e3f6f2a6d618fd32344dae3e", + "sha256:0e6190d6971c46729f712dd7307a9c0a8c027bfa5b4d8f2edef106b01759926c", + "sha256:202587c754901d0678bd6ff89c707f099987928239049a528470c06c6c922cf8", + "sha256:345197ba9278cf6a914cb7421dc665a0531a219b0072abf6b0cebfdf68e75725", + "sha256:3a296b8d6b02f0e01ab04bedea658f43eef5ad2f8e586a820226ead1a677d9b1", + "sha256:484572a2b22823a967be106137a93f7d634db116b3f7accb37dbd760eda2fa9f", + "sha256:4c67c9c803e50ceff32cc5e4769c50fc8ae8df9c4e5cc592ce8310b5a1076d23", + "sha256:539038087c321911745fc2e77049209b1231300d481cb4d682b2f95c724814b3", + "sha256:58113e0683c3688594c112103d7e9f2d0092fd2d8297a220240bea22e184dfdd", + "sha256:65cb25ca4284804293a2404d1be3b5a98818be21a72791649bacbcfa4e431d41", + "sha256:699e765da2580e34b08473fc0acef3a2d7bcb7f13eb29401cd25236bcf000080", + "sha256:6b54c3470810cea49a8be90814d05c5325ceb9c5bf429fd86c36fc1b32dfc157", + "sha256:71ac1629e4eae2ed329be8caf45efea10bfe1af3d8767e12e64b83e4ea5a3250", + "sha256:722c179d3df8677f3daf45b1a2764678ed4f0aaddbaa7211a8a08ebfd907c0db", + "sha256:76ac2b08d3d956d77b574bb43cbf1d37bd58b9d50c04ba281303e695854ebc46", + "sha256:7eff1157be9e26bf7494288c89979ca69d593a009e2c7420a739e2cf1e0635f5", + "sha256:99546c8696d27d0efa639c77b2f8af6e61dc3a5073caae4f27ffd991ca926f42", + "sha256:a0c263b31d335f29c11f4a9e98fbcd908d0731d4ea99bfd27c1c47caaeb4ca2e", + "sha256:a29c66292605bff962adc26530c030607aa699206b12dfb84f131b0454e15df4", + "sha256:a4d3724c5a1ddd86d7d830d8e02c40151839b833791dd4b6fe9e144380fa7d37", + "sha256:aed9f33b19d542eea56c38ef3862ca56147f7903648156cd57eabb0fe47c35d6", + "sha256:b57e733dd8871d2cc7358c2e0fe33027453afffbcd0ea6a537f54877cad5131c", + "sha256:d5bf4db62236e82955c40bafbaa18d54b20b5ceefa06fb57c7facc443929f4bd", + "sha256:d9272dd71ab78e87fa34a0a59bbd6acc9a9c0005c834a6fc8457ff9619dc6795", + "sha256:e9d5671bcbb90a41b0832fcb3872fcbaca3d68ff11ea09724dd6cbdf31d947fb", + "sha256:ee54646afb2b73b293c94cf079682d18d404ebd6c01122dc3980f111aec2d8ae", + "sha256:f16a87197939977824609005b73f9ebb291b9653a14e5f27afc1c5d6f981ba39" ], "index": "pypi", - "version": "==0.28.3" - }, - "e1839a8": { - "editable": true, - "path": "." + "version": "==0.28.4" }, "fancycompleter": { "hashes": [ @@ -438,45 +442,54 @@ "sha256:2df854df882d322d5c23087a4959e145b953dfff2abe1774fec4f639ac2f3160", "sha256:381ad13c30cd1d0b2f3da8a0c1a4aa697487e8bb0e9e0cbeb7439776bcb645f8", "sha256:385f1ce46e08676505b692bfde918c1e0b350963a15ef52d77691c2cf0f5dbf6", + "sha256:4130e5ae16c656b7de654dc5e595cfeb85d3a4b0bb0734d19c0dce6dc7ee0e07", "sha256:4d278c2261be6423c5e63d8f0ceb1b0c6db3ff83f2906f4b860db6ae99ca1bb5", "sha256:51c5dcb51cf88b34b7d04c15f600b07c6ccbb73a089a38af2ab83c02862318da", "sha256:589336ba5199c8061239cf446ee2f2f1fcc0c68e8531ee1382b6fc0c66b2d388", + "sha256:5ae3564cb630e155a650f4f9c054589848e97836bebae5637240a0d8099f817b", "sha256:5edf1acc827ed139086af95ce4449b7b664f57a8c29eb755411a634be280d9f2", "sha256:6b82b81c6b3b70ed40bc6d0b71222ebfcd6b6c04a6e7945a936e514b9113d5a3", "sha256:6c57f973218b776195d0356e556ec932698f3a563e2f640cfca7020086383f50", "sha256:758d1091a501fd2d75034e55e7e98bfd1370dc089160845c242db1c760d944d9", "sha256:8622db292b766719810e0cb0f62ef6141e15fe32b04e4eb2959888319e59336b", "sha256:8b8dcfcd630f1981f0f1e3846fae883376762a0c1b472baa35b145b911683b7b", + "sha256:91fdd510743ae4df862dbd51a4354519dd9fb8941347526cd9c2194b792b3da9", "sha256:97fa8f1dceffab782069b291e38c4c2227f255cdac5f1e3346666931df87373e", + "sha256:9b705f18b26fb551366ab6347ba9941b62272bf71c6bbcadcd8af94d10535241", "sha256:9d69967673ab7b028c2df09cae05ba56bf4e39e3cb04ebe452b6035c3b49848e", "sha256:9e1f53afae865cc32459ad211493cf9e2a3651a7295b7a38654ef3d123808996", "sha256:a4a433b3a264dbc9aa9c7c241e87c0358a503ea6394f8737df1683c7c9a102ac", "sha256:baadc5f770917ada556afb7651a68176559f4dca5f4b2d0947cd15b9fb84fb51", "sha256:c725d11990a9243e6ceffe0ab25a07c46c1cc2c5dc55e305717b5afe856c9608", "sha256:d696a8c87315a83983fc59dd27efe034292b9e8ad667aeae51a68b4be14690d9", - "sha256:e1864a4e9f93ddb2dc6b62ccc2ec1f8250ff4ac0d3d7a15c8985dd4e1fbd6418" + "sha256:e1864a4e9f93ddb2dc6b62ccc2ec1f8250ff4ac0d3d7a15c8985dd4e1fbd6418", + "sha256:e1d18421a7e2ad4a655b76e65d549d4159f8874c18a417464c1d439ee7ccc7cd" ], "version": "==1.14.5" }, "pandas": { "hashes": [ - "sha256:211cfdb9f72f26d2ede21c751d27e08fed4434d47fb9bb82ebc8ff753888b8b6", - "sha256:28fd087514616549a0e3259cd68ac88d7eaed6bd3062017a7f312e27941266bd", - "sha256:2fb7c63138bd5ead296b18b2cb6abd3a394f7581e5ae052b02b27df8244b03ca", - "sha256:372435456c349a8d39ff001967b161f6bd29d4c3de145a4cf9b366648defbb1f", - "sha256:3790a3348ab0f416e58061d21693cb662fbb2f638001b94bf2b2199fedc1b1c2", - "sha256:437a6e906a6717a9ed2627cf6e7895b63dfaa0172567cbd75a553f55cf78cc17", - "sha256:50b52af2af2e15f4aeb2fe196da073a8c131fa02e433e105d95ce40016df5690", - "sha256:720daad75b5d35dd1b446842210c4f3fd447464c9c0884972f3f12b213a9edd1", - "sha256:b4fb71acbc2709b8f5993cb4b5445d8182864f11c39787e317aae39f21206270", - "sha256:b704fd73022342cce612996de495a16954311e0c0cf077c1b83d5cf0b9656a60", - "sha256:cbbecca0c7af6a2160b2d6ba30becc286824a98c61dcc6a41fada664f226424c", - "sha256:d2a071de755cc8ee7784e1b4c7b9b643d951d35c8adea7d64fe7c57cff9c47a7", - "sha256:d8154c5c68713a82461aba735832f0b4692be8a45a0a340a303bf90d6f80f36f", - "sha256:e1b86f7c55467ce1f6c12715f2fd1817f4a909b5c8c39bd4b5d2415ef2b04bd8", - "sha256:fcc63e8134516e93e16eb4ceac9afaa51f4adc5bf58efddae7cbc562f5b77dd0" + "sha256:05ac350f8a35abe6a02054f8cf54e0c048f13423b2acb87d018845afd736f0b4", + "sha256:174543cd68eaee60620146b38faaed950071f5665e0a4fa4adfdcfc23d7f7936", + "sha256:1a62a237fb7223c11d09daaeaf7d15f234bb836bfaf3d4f85746cdf9b2582f99", + "sha256:2c1ed1de5308918a7c6833df6db75a19c416c122921824e306c64a0626b3606c", + "sha256:33825ad26ce411d6526f903b3d02c0edf627223af59cf4b5876aa925578eec74", + "sha256:4c5f76fce8a4851f65374ea1d95ca24e9439540550e41e556c0879379517a6f5", + "sha256:67504a96f72fb4d7f051cfe77b9a7bb0d094c4e2e5a6efb2769eb80f36e6b309", + "sha256:683e0cc8c7faececbbc06aa4735709a07abad106099f165730c1015da916adec", + "sha256:77cd1b485c6a860b950ab3a85be7b5683eaacbc51cadf096db967886607d2231", + "sha256:814f8785f1ab412a7e9b9a8abb81dfe8727ebdeef850ecfaa262c04b1664000f", + "sha256:894216edaf7dd0a92623cdad423bbec2a23fc06eb9c85483e21876d1ef8f47e9", + "sha256:9331e20a07360b81d8c7b4b50223da387d264151d533a5a5853325800e6631a4", + "sha256:9cd3614b4e31a0889388ff1bd19ae857ad52658b33f776065793c293a29cf612", + "sha256:9d79e958adcd037eba3debbb66222804171197c0f5cd462315d1356aa72a5a30", + "sha256:b90e5d5460f23607310cbd1688a7517c96ce7b284095a48340d249dfc429172e", + "sha256:bc80c13ffddc7e269b706ed58002cc4c98cc135c36d827c99fb5ca54ced0eb7a", + "sha256:cbb074efb2a5e4956b261a670bfc2126b0ccfbf5b96b6ed021bc8c8cb56cf4a8", + "sha256:e8c62ab16feeda84d4732c42b7b67d7a89ad89df7e99efed80ea017bdc472f26", + "sha256:ff5ef271805fe877fe0d1337b6b1861113c44c75b9badb595c713a72cd337371" ], - "version": "==0.23.1" + "version": "==0.23.3" }, "pdbpp": { "hashes": [ @@ -509,11 +522,11 @@ }, "pytest": { "hashes": [ - "sha256:8ea01fc4fcc8e1b1e305252b4bc80a1528019ab99fd3b88666c9dc38d754406c", - "sha256:90898786b3d0b880b47645bae7b51aa9bbf1e9d1e4510c2cfd15dd65c70ea0cd" + "sha256:0453c8676c2bee6feb0434748b068d5510273a916295fd61d306c4f22fbfd752", + "sha256:4b208614ae6d98195430ad6bde03641c78553acee7c83cec2e85d613c0cd383d" ], "index": "pypi", - "version": "==3.6.2" + "version": "==3.6.3" }, "python-dateutil": { "hashes": [ @@ -524,10 +537,10 @@ }, "pytz": { "hashes": [ - "sha256:65ae0c8101309c45772196b21b74c46b2e5d11b6275c45d251b150d5da334555", - "sha256:c06425302f2cf668f1bba7a0a03f3c1d34d4ebeef2c72003da308b3947c7f749" + "sha256:a061aa0a9e06881eb8b3b2b43f05b9439d6583c206d0a6c340ff72a7b6669053", + "sha256:ffb9ef1de172603304d9d2819af6f5ece76f2e85ec10692a524dd876e72bf277" ], - "version": "==2018.4" + "version": "==2018.5" }, "six": { "hashes": [ @@ -548,6 +561,7 @@ "sha256:0496f575ed118eb382346a728971766d54b3a3e39e9825e94d4d513b0fe96145", "sha256:fc5513551d22ec2be8bd05ddd56e9c3c377ac47c79a3866fa2d8710bfae4a0cb" ], + "index": "pypi", "version": "==0.4.0" }, "wmctrl": { diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 506e1869..f92f3e53 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -9,10 +9,9 @@ from types import ModuleType from typing import Coroutine, Callable import trio +import tractor -from .. import tractor -from ..log import get_logger -from ..ipc import Channel +from ..log import get_logger, get_console_log from . import get_brokermod @@ -75,7 +74,7 @@ async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict: async def stream_quotes( brokermod: ModuleType, get_quotes: Coroutine, - tickers2chans: {str: Channel}, + tickers2chans: {str: tractor.Channel}, rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue cid: str = None, @@ -246,8 +245,10 @@ def modify_quote_stream(broker, tickers, chan=None, cid=None): lambda ticker: ticker not in tickers, tickers2chans.copy() ): chanset = tickers2chans.get(ticker) - if chanset: - chanset.discard((chan, cid)) + # XXX: cid will be different on unsub call + for item in chanset.copy(): + if chan in item: + chanset.discard(item) if not chanset: # pop empty sets which will trigger bg quoter task termination @@ -257,7 +258,7 @@ def modify_quote_stream(broker, tickers, chan=None, cid=None): async def start_quote_stream( broker: str, tickers: [str], - chan: 'Channel' = None, + chan: tractor.Channel = None, cid: str = None, ) -> None: """Handle per-broker quote stream subscriptions. @@ -266,8 +267,11 @@ async def start_quote_stream( Since most brokers seems to support batch quote requests we limit to one task per process for now. """ + actor = tractor.current_actor() + # set log level after fork + get_console_log(actor.loglevel) # pull global vars from local actor - ss = tractor.current_actor().statespace + ss = actor.statespace broker2tickersubs = ss['broker2tickersubs'] clients = ss['clients'] dtasks = ss['dtasks'] diff --git a/piker/cli.py b/piker/cli.py index a703970a..401862ea 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -8,11 +8,11 @@ import os import click import pandas as pd import trio +import tractor from . import watchlists as wl from .brokers import core, get_brokermod from .log import get_console_log, colorize_json, get_logger -from . import tractor log = get_logger('cli') DEFAULT_BROKER = 'robinhood' @@ -38,6 +38,7 @@ def pikerd(loglevel, host): outlive_main=True, # run daemon forever rpc_module_paths=['piker.brokers.core'], name='brokerd', + loglevel=loglevel, ) diff --git a/piker/ipc.py b/piker/ipc.py deleted file mode 100644 index 7bc647cf..00000000 --- a/piker/ipc.py +++ /dev/null @@ -1,191 +0,0 @@ -""" -Inter-process comms abstractions -""" -from typing import Coroutine, Tuple - -import msgpack -import trio - -from .log import get_logger -log = get_logger('ipc') - - -class StreamQueue: - """Stream wrapped as a queue that delivers ``msgpack`` serialized objects. - """ - def __init__(self, stream): - self.stream = stream - self._agen = self._iter_packets() - self._laddr = self.stream.socket.getsockname()[:2] - self._raddr = self.stream.socket.getpeername()[:2] - self._send_lock = trio.Lock() - - async def _iter_packets(self): - """Yield packets from the underlying stream. - """ - unpacker = msgpack.Unpacker(raw=False, use_list=False) - while True: - try: - data = await self.stream.receive_some(2**10) - log.trace(f"received {data}") - except trio.BrokenStreamError: - log.error(f"Stream connection {self.raddr} broke") - return - - if data == b'': - log.debug(f"Stream connection {self.raddr} was closed") - return - - unpacker.feed(data) - for packet in unpacker: - yield packet - - @property - def laddr(self): - return self._laddr - - @property - def raddr(self): - return self._raddr - - async def put(self, data): - async with self._send_lock: - return await self.stream.send_all( - msgpack.dumps(data, use_bin_type=True)) - - async def get(self): - return await self._agen.asend(None) - - async def __aiter__(self): - return self._agen - - def connected(self): - return self.stream.socket.fileno() != -1 - - -class Channel: - """A channel to actors in other processes. - - Use this to talk to any micro-service daemon or other client(s) over a - a transport managed by ``trio``. - """ - def __init__( - self, - destaddr: tuple = None, - on_reconnect: Coroutine = None, - auto_reconnect: bool = False, - stream: trio.SocketStream = None, # expected to be active - ) -> None: - self._recon_seq = on_reconnect - self._autorecon = auto_reconnect - self.squeue = StreamQueue(stream) if stream else None - if self.squeue and destaddr: - raise ValueError( - f"A stream was provided with local addr {self.laddr}" - ) - self._destaddr = destaddr or self.squeue.raddr - # set after handshake - always uid of far end - self.uid = None - - def __repr__(self): - if self.squeue: - return repr( - self.squeue.stream.socket._sock).replace( - "socket.socket", "Channel") - return object.__repr__(self) - - @property - def laddr(self): - return self.squeue.laddr if self.squeue else (None, None) - - @property - def raddr(self): - return self.squeue.raddr if self.squeue else (None, None) - - async def connect(self, destaddr: Tuple[str, int] = None, **kwargs): - if self.connected(): - raise RuntimeError("channel is already connected?") - destaddr = destaddr or self._destaddr - stream = await trio.open_tcp_stream(*destaddr, **kwargs) - self.squeue = StreamQueue(stream) - return stream - - async def send(self, item): - log.trace(f"send `{item}`") - await self.squeue.put(item) - - async def recv(self): - try: - return await self.squeue.get() - except trio.BrokenStreamError: - if self._autorecon: - await self._reconnect() - return await self.recv() - - async def aclose(self, *args): - log.debug(f"Closing {self}") - await self.squeue.stream.aclose() - - async def __aenter__(self): - await self.connect() - return self - - async def __aexit__(self, *args): - await self.aclose(*args) - - async def __aiter__(self): - return self.aiter_recv() - - async def _reconnect(self): - """Handle connection failures by polling until a reconnect can be - established. - """ - down = False - while True: - try: - with trio.move_on_after(3) as cancel_scope: - await self.connect() - cancelled = cancel_scope.cancelled_caught - if cancelled: - log.warn( - "Reconnect timed out after 3 seconds, retrying...") - continue - else: - log.warn("Stream connection re-established!") - # run any reconnection sequence - on_recon = self._recon_seq - if on_recon: - await on_recon(self) - break - except (OSError, ConnectionRefusedError): - if not down: - down = True - log.warn( - f"Connection to {self.raddr} went down, waiting" - " for re-establishment") - await trio.sleep(1) - - async def aiter_recv(self): - """Async iterate items from underlying stream. - """ - while True: - try: - async for item in self.squeue: - yield item - # sent = yield item - # if sent is not None: - # # optimization, passing None through all the - # # time is pointless - # await self.squeue.put(sent) - except trio.BrokenStreamError: - if not self._autorecon: - raise - await self.aclose() - if self._autorecon: # attempt reconnect - await self._reconnect() - continue - else: - return - - def connected(self): - return self.squeue.connected() if self.squeue else False diff --git a/piker/tractor.py b/piker/tractor.py deleted file mode 100644 index cb7e121d..00000000 --- a/piker/tractor.py +++ /dev/null @@ -1,899 +0,0 @@ -""" -tracor: An actor model micro-framework. -""" -from collections import defaultdict -from functools import partial -from typing import Coroutine -import importlib -import inspect -import multiprocessing as mp -import traceback -import uuid - -import trio -from async_generator import asynccontextmanager - -from .ipc import Channel -from .log import get_console_log, get_logger - -ctx = mp.get_context("forkserver") -log = get_logger('tractor') - -# set at startup and after forks -_current_actor = None -_default_arbiter_host = '127.0.0.1' -_default_arbiter_port = 1616 - - -class ActorFailure(Exception): - "General actor failure" - - -class RemoteActorError(ActorFailure): - "Remote actor exception bundled locally" - - -@asynccontextmanager -async def maybe_open_nursery(nursery=None): - """Create a new nursery if None provided. - - Blocks on exit as expected if no input nursery is provided. - """ - if nursery is not None: - yield nursery - else: - async with trio.open_nursery() as nursery: - yield nursery - - -async def _invoke( - cid, chan, func, kwargs, - treat_as_gen=False, raise_errs=False, - task_status=trio.TASK_STATUS_IGNORED -): - """Invoke local func and return results over provided channel. - """ - try: - is_async_partial = False - if isinstance(func, partial): - is_async_partial = inspect.iscoroutinefunction(func.func) - - if not inspect.iscoroutinefunction(func) and not is_async_partial: - await chan.send({'return': func(**kwargs), 'cid': cid}) - else: - coro = func(**kwargs) - - if inspect.isasyncgen(coro): - async for item in coro: - # TODO: can we send values back in here? - # it's gonna require a `while True:` and - # some non-blocking way to retrieve new `asend()` - # values from the channel: - # to_send = await chan.recv_nowait() - # if to_send is not None: - # to_yield = await coro.asend(to_send) - await chan.send({'yield': item, 'cid': cid}) - else: - if treat_as_gen: - # XXX: the async-func may spawn further tasks which push - # back values like an async-generator would but must - # manualy construct the response dict-packet-responses as - # above - await coro - else: - await chan.send({'return': await coro, 'cid': cid}) - - task_status.started() - except Exception: - if not raise_errs: - await chan.send({'error': traceback.format_exc(), 'cid': cid}) - else: - raise - - -async def result_from_q(q): - """Process a msg from a remote actor. - """ - first_msg = await q.get() - if 'return' in first_msg: - return 'return', first_msg, q - elif 'yield' in first_msg: - return 'yield', first_msg, q - elif 'error' in first_msg: - raise RemoteActorError(first_msg['error']) - else: - raise ValueError(f"{first_msg} is an invalid response packet?") - - -async def _do_handshake(actor, chan): - await chan.send(actor.uid) - uid = await chan.recv() - - if not isinstance(uid, tuple): - raise ValueError(f"{uid} is not a valid uid?!") - - chan.uid = uid - log.info(f"Handshake with actor {uid}@{chan.raddr} complete") - return uid - - -class Actor: - """The fundamental concurrency primitive. - - An *actor* is the combination of a regular Python or - ``multiprocessing.Process`` executing a ``trio`` task tree, communicating - with other actors through "portals" which provide a native async API - around "channels". - """ - is_arbiter = False - - def __init__( - self, - name: str, - main: Coroutine = None, - rpc_module_paths: [str] = [], - statespace: dict = {}, - uid: str = None, - allow_rpc: bool = True, - outlive_main: bool = False, - ): - self.name = name - self.uid = (name, uid or str(uuid.uuid1())) - self.rpc_module_paths = rpc_module_paths - self._mods = {} - self.main = main - # TODO: consider making this a dynamically defined - # @dataclass once we get py3.7 - self.statespace = statespace - self._allow_rpc = allow_rpc - self._outlive_main = outlive_main - - # filled in by `_async_main` after fork - self._peers = defaultdict(list) - self._no_more_peers = trio.Event() - self._no_more_peers.set() - self._actors2calls = {} # map {uids -> {callids -> waiter queues}} - self._listeners = [] - self._parent_chan = None - self._accept_host = None - - async def wait_for_peer(self, uid): - """Wait for a connection back from a spawned actor with a given - ``uid``. - """ - log.debug(f"Waiting for peer {uid} to connect") - event = self._peers.setdefault(uid, trio.Event()) - await event.wait() - log.debug(f"{uid} successfully connected back to us") - return event, self._peers[uid][-1] - - def load_namespaces(self): - # We load namespaces after fork since this actor may - # be spawned on a different machine from the original nursery - # and we need to try and load the local module code (if it - # exists) - for path in self.rpc_module_paths: - self._mods[path] = importlib.import_module(path) - - async def _stream_handler( - self, - stream: trio.SocketStream, - ): - """ - Entry point for new inbound connections to the channel server. - """ - self._no_more_peers.clear() - chan = Channel(stream=stream) - log.info(f"New connection to us {chan}") - - # send/receive initial handshake response - try: - uid = await _do_handshake(self, chan) - except StopAsyncIteration: - log.warn(f"Channel {chan} failed to handshake") - return - - # channel tracking - event_or_chans = self._peers.pop(uid, None) - if isinstance(event_or_chans, trio.Event): - # Instructing connection: this is likely a new channel to - # a recently spawned actor which we'd like to control via - # async-rpc calls. - log.debug(f"Waking channel waiters {event_or_chans.statistics()}") - # Alert any task waiting on this connection to come up - event_or_chans.set() - event_or_chans.clear() # consumer can wait on channel to close - elif isinstance(event_or_chans, list): - log.warn( - f"already have channel(s) for {uid}:{event_or_chans}?" - ) - # append new channel - self._peers[uid].extend(event_or_chans) - - log.debug(f"Registered {chan} for {uid}") - self._peers[uid].append(chan) - - # Begin channel management - respond to remote requests and - # process received reponses. - try: - await self._process_messages(chan) - finally: - # Drop ref to channel so it can be gc-ed and disconnected - if chan is not self._parent_chan: - log.debug(f"Releasing channel {chan}") - chans = self._peers.get(chan.uid) - chans.remove(chan) - if not chans: - log.debug(f"No more channels for {chan.uid}") - self._peers.pop(chan.uid, None) - if not self._peers: # no more channels connected - self._no_more_peers.set() - log.debug(f"No more peer channels") - - def _push_result(self, actorid, cid, msg): - assert actorid, f"`actorid` can't be {actorid}" - q = self.get_waitq(actorid, cid) - log.debug(f"Delivering {msg} from {actorid} to caller {cid}") - waiters = q.statistics().tasks_waiting_get - if not waiters: - log.warn( - f"No tasks are currently waiting for results from call {cid}?") - q.put_nowait(msg) - - def get_waitq(self, actorid, cid): - log.debug(f"Registering for callid {cid} queue results from {actorid}") - cids2qs = self._actors2calls.setdefault(actorid, {}) - return cids2qs.setdefault(cid, trio.Queue(1000)) - - async def send_cmd(self, chan, ns, func, kwargs): - """Send a ``'cmd'`` message to a remote actor and return a - caller id and a ``trio.Queue`` that can be used to wait for - responses delivered by the local message processing loop. - """ - cid = str(uuid.uuid1()) - q = self.get_waitq(chan.uid, cid) - log.debug(f"Sending cmd to {chan.uid}: {ns}.{func}({kwargs})") - await chan.send({'cmd': (ns, func, kwargs, self.uid, cid)}) - return cid, q - - async def _process_messages(self, chan, treat_as_gen=False): - """Process messages async-RPC style. - - Process rpc requests and deliver retrieved responses from channels. - """ - # TODO: once https://github.com/python-trio/trio/issues/467 gets - # worked out we'll likely want to use that! - log.debug(f"Entering msg loop for {chan}") - async with trio.open_nursery() as nursery: - try: - async for msg in chan.aiter_recv(): - if msg is None: # terminate sentinel - log.debug(f"Cancelling all tasks for {chan}") - nursery.cancel_scope.cancel() - log.debug(f"Terminating msg loop for {chan}") - break - log.debug(f"Received msg {msg}") - cid = msg.get('cid') - if cid: # deliver response to local caller/waiter - self._push_result(chan.uid, cid, msg) - if 'error' in msg: - # TODO: need something better then this slop - raise RemoteActorError(msg['error']) - log.debug(f"Waiting on next msg for {chan}") - continue - else: - ns, funcname, kwargs, actorid, cid = msg['cmd'] - - log.debug( - f"Processing request from {actorid}\n" - f"{ns}.{funcname}({kwargs})") - if ns == 'self': - func = getattr(self, funcname) - else: - func = getattr(self._mods[ns], funcname) - - # spin up a task for the requested function - sig = inspect.signature(func) - treat_as_gen = False - if 'chan' in sig.parameters: - assert 'cid' in sig.parameters, \ - f"{func} must accept a `cid` (caller id) kwarg" - kwargs['chan'] = chan - kwargs['cid'] = cid - # TODO: eventually we want to be more stringent - # about what is considered a far-end async-generator. - # Right now both actual async gens and any async - # function which declares a `chan` kwarg in its - # signature will be treated as one. - treat_as_gen = True - - log.debug(f"Spawning task for {func}") - nursery.start_soon( - _invoke, cid, chan, func, kwargs, treat_as_gen, - name=funcname - ) - log.debug(f"Waiting on next msg for {chan}") - else: # channel disconnect - log.debug(f"{chan} disconnected") - except trio.ClosedStreamError: - log.error(f"{chan} broke") - - log.debug(f"Exiting msg loop for {chan}") - - def _fork_main(self, accept_addr, parent_addr=None, loglevel=None): - # after fork routine which invokes a fresh ``trio.run`` - log.info( - f"Started new {ctx.current_process()} for actor {self.uid}") - global _current_actor - _current_actor = self - if loglevel: - get_console_log(loglevel) - log.debug(f"parent_addr is {parent_addr}") - try: - trio.run(partial( - self._async_main, accept_addr, parent_addr=parent_addr)) - except KeyboardInterrupt: - pass # handle it the same way trio does? - log.debug(f"Actor {self.uid} terminated") - - async def _async_main( - self, - accept_addr, - arbiter_addr=(_default_arbiter_host, _default_arbiter_port), - parent_addr=None, - nursery=None - ): - """Start the channel server and main task. - - A "root-most" (or "top-level") nursery for this actor is opened here - and when cancelled effectively cancels the actor. - """ - result = None - try: - async with maybe_open_nursery(nursery) as nursery: - self._root_nursery = nursery - - # Startup up channel server - host, port = accept_addr - await nursery.start(partial( - self._serve_forever, accept_host=host, accept_port=port) - ) - - if parent_addr is not None: - # Connect back to the parent actor and conduct initial - # handshake (From this point on if we error ship the - # exception back to the parent actor) - chan = self._parent_chan = Channel( - destaddr=parent_addr, - on_reconnect=self.main - ) - await chan.connect() - # initial handshake, report who we are, who they are - await _do_handshake(self, chan) - - # handle new connection back to parent optionally - # begin responding to RPC - if self._allow_rpc: - self.load_namespaces() - if self._parent_chan: - nursery.start_soon( - self._process_messages, self._parent_chan) - - # register with the arbiter if we're told its addr - log.debug(f"Registering {self} for role `{self.name}`") - async with get_arbiter(*arbiter_addr) as arb_portal: - await arb_portal.run( - 'self', 'register_actor', - name=self.name, sockaddr=self.accept_addr) - - if self.main: - if self._parent_chan: - log.debug(f"Starting main task `{self.main}`") - # start "main" routine in a task - await nursery.start( - _invoke, 'main', self._parent_chan, self.main, {}, - False, True # treat_as_gen, raise_errs params - ) - else: - # run directly - log.debug(f"Running `{self.main}` directly") - result = await self.main() - - # terminate local in-proc once its main completes - log.debug( - f"Waiting for remaining peers {self._peers} to clear") - await self._no_more_peers.wait() - log.debug(f"All peer channels are complete") - - # tear down channel server - if not self._outlive_main: - log.debug(f"Shutting down channel server") - self.cancel_server() - - # blocks here as expected if no nursery was provided until - # the channel server is killed (i.e. this actor is - # cancelled or signalled by the parent actor) - except Exception: - if self._parent_chan: - log.exception("Actor errored:") - await self._parent_chan.send( - {'error': traceback.format_exc(), 'cid': 'main'}) - else: - raise - finally: - # UNregister actor from the arbiter - try: - if arbiter_addr is not None: - async with get_arbiter(*arbiter_addr) as arb_portal: - await arb_portal.run( - 'self', 'register_actor', - name=self.name, sockaddr=self.accept_addr) - except OSError: - log.warn(f"Unable to unregister {self.name} from arbiter") - - return result - - async def _serve_forever( - self, - *, - # (host, port) to bind for channel server - accept_host=None, - accept_port=0, - task_status=trio.TASK_STATUS_IGNORED - ): - """Main coroutine: connect back to the parent, spawn main task, begin - listening for new messages. - - """ - async with trio.open_nursery() as nursery: - self._server_nursery = nursery - # TODO: might want to consider having a separate nursery - # for the stream handler such that the server can be cancelled - # whilst leaving existing channels up - listeners = await nursery.start( - partial( - trio.serve_tcp, - self._stream_handler, - handler_nursery=self._root_nursery, - port=accept_port, host=accept_host, - ) - ) - log.debug( - f"Started tcp server(s) on {[l.socket for l in listeners]}") - self._listeners.extend(listeners) - task_status.started() - - def cancel(self): - """This cancels the internal root-most nursery thereby gracefully - cancelling (for all intents and purposes) this actor. - """ - self._root_nursery.cancel_scope.cancel() - - def cancel_server(self): - """Cancel the internal channel server nursery thereby - preventing any new inbound connections from being established. - """ - self._server_nursery.cancel_scope.cancel() - - @property - def accept_addr(self): - """Primary address to which the channel server is bound. - """ - try: - return self._listeners[0].socket.getsockname() - except OSError: - return - - def get_parent(self): - return Portal(self._parent_chan) - - def get_chans(self, actorid): - return self._peers[actorid] - - -class Arbiter(Actor): - """A special actor who knows all the other actors and always has - access to the top level nursery. - - The arbiter is by default the first actor spawned on each host - and is responsible for keeping track of all other actors for - coordination purposes. If a new main process is launched and an - arbiter is already running that arbiter will be used. - """ - _registry = defaultdict(list) - is_arbiter = True - - def find_actor(self, name): - return self._registry[name] - - def register_actor(self, name, sockaddr): - self._registry[name].append(sockaddr) - - def unregister_actor(self, name, sockaddr): - sockaddrs = self._registry.get(name) - if sockaddrs: - try: - sockaddrs.remove(sockaddr) - except ValueError: - pass - - -class Portal: - """A 'portal' to a(n) (remote) ``Actor``. - - Allows for invoking remote routines and receiving results through an - underlying ``tractor.Channel`` as though the remote (async) - function / generator was invoked locally. - - Think of this like an native async IPC API. - """ - def __init__(self, channel): - self.channel = channel - - async def aclose(self): - log.debug(f"Closing {self}") - # XXX: won't work until https://github.com/python-trio/trio/pull/460 - # gets in! - await self.channel.aclose() - - async def run(self, ns, func, **kwargs): - """Submit a function to be scheduled and run by actor, return its - (stream of) result(s). - """ - # TODO: not this needs some serious work and thinking about how - # to make async-generators the fundamental IPC API over channels! - # (think `yield from`, `gen.send()`, and functional reactive stuff) - chan = self.channel - # ship a function call request to the remote actor - actor = current_actor() - - cid, q = await actor.send_cmd(chan, ns, func, kwargs) - # wait on first response msg - resptype, first_msg, q = await result_from_q(q) - - if resptype == 'yield': - - async def yield_from_q(): - yield first_msg['yield'] - try: - async for msg in q: - try: - yield msg['yield'] - except KeyError: - raise RemoteActorError(msg['error']) - except GeneratorExit: - log.debug(f"Cancelling async gen call {cid} to {chan.uid}") - - return yield_from_q() - - elif resptype == 'return': - return first_msg['return'] - else: - raise ValueError(f"Unknown msg response type: {first_msg}") - - -@asynccontextmanager -async def open_portal(channel, nursery=None): - """Open a ``Portal`` through the provided ``channel``. - - Spawns a background task to handle rpc message processing. - """ - actor = current_actor() - assert actor - was_connected = False - - async with maybe_open_nursery(nursery) as nursery: - - if not channel.connected(): - await channel.connect() - was_connected = True - - if channel.uid is None: - await _do_handshake(actor, channel) - - if not actor.get_chans(channel.uid): - # actor is not currently managing this channel - actor._peers[channel.uid].append(channel) - - nursery.start_soon(actor._process_messages, channel) - yield Portal(channel) - - # cancel background msg loop task - nursery.cancel_scope.cancel() - if was_connected: - actor._peers[channel.uid].remove(channel) - await channel.aclose() - - -class LocalPortal: - """A 'portal' to a local ``Actor``. - - A compatibility shim for normal portals but for invoking functions - using an in process actor instance. - """ - def __init__(self, actor): - self.actor = actor - - async def run(self, ns, func, **kwargs): - """Run a requested function locally and return it's result. - """ - obj = self.actor if ns == 'self' else importlib.import_module(ns) - func = getattr(obj, func) - return func(**kwargs) - - -class ActorNursery: - """Spawn scoped subprocess actors. - """ - def __init__(self, actor, supervisor=None): - self.supervisor = supervisor - self._actor = actor - # We'll likely want some way to cancel all sub-actors eventually - # self.cancel_scope = cancel_scope - self._children = {} - - async def __aenter__(self): - return self - - async def start_actor( - self, - name: str, - main=None, - bind_addr=('127.0.0.1', 0), - statespace=None, - rpc_module_paths=None, - outlive_main=False, # sub-actors die when their main task completes - loglevel=None, # set console logging per subactor - ): - actor = Actor( - name, - # modules allowed to invoked funcs from - rpc_module_paths=rpc_module_paths, - statespace=statespace, # global proc state vars - main=main, # main coroutine to be invoked - outlive_main=outlive_main, - ) - parent_addr = self._actor.accept_addr - assert parent_addr - proc = ctx.Process( - target=actor._fork_main, - args=(bind_addr, parent_addr, loglevel), - daemon=True, - name=name, - ) - proc.start() - if not proc.is_alive(): - raise ActorFailure("Couldn't start sub-actor?") - - # wait for actor to spawn and connect back to us - # channel should have handshake completed by the - # local actor by the time we get a ref to it - event, chan = await self._actor.wait_for_peer(actor.uid) - # channel is up, get queue which delivers result from main routine - main_q = self._actor.get_waitq(actor.uid, 'main') - self._children[(name, proc.pid)] = (actor, proc, main_q) - - return Portal(chan) - - async def wait(self): - - async def wait_for_proc(proc): - # TODO: timeout block here? - if proc.is_alive(): - await trio.hazmat.wait_readable(proc.sentinel) - # please god don't hang - proc.join() - log.debug(f"Joined {proc}") - - # unblocks when all waiter tasks have completed - async with trio.open_nursery() as nursery: - for subactor, proc, main_q in self._children.values(): - nursery.start_soon(wait_for_proc, proc) - - async def cancel(self, hard_kill=False): - log.debug(f"Cancelling nursery") - for subactor, proc, main_q in self._children.values(): - if proc is mp.current_process(): - # XXX: does this even make sense? - await subactor.cancel() - else: - if hard_kill: - log.warn(f"Hard killing subactors {self._children}") - proc.terminate() - # send KeyBoardInterrupt (trio abort signal) to underlying - # sub-actors - # os.kill(proc.pid, signal.SIGINT) - else: - # send cancel cmd - likely no response from subactor - actor = self._actor - chans = actor.get_chans(subactor.uid) - if chans: - for chan in chans: - await actor.send_cmd(chan, 'self', 'cancel', {}) - else: - log.warn( - f"Channel for {subactor.uid} is already down?") - log.debug(f"Waiting on all subactors to complete") - await self.wait() - log.debug(f"All subactors for {self} have terminated") - - async def __aexit__(self, etype, value, tb): - """Wait on all subactor's main routines to complete. - """ - async def wait_for_actor(actor, proc, q): - if proc.is_alive(): - ret_type, msg, q = await result_from_q(q) - log.info(f"{actor.uid} main task completed with {msg}") - if not actor._outlive_main: - # trigger msg loop to break - chans = self._actor.get_chans(actor.uid) - for chan in chans: - log.info(f"Signalling msg loop exit for {actor.uid}") - await chan.send(None) - - if etype is not None: - log.warn(f"{current_actor().uid} errored with {etype}, " - "cancelling actor nursery") - await self.cancel() - else: - log.debug(f"Waiting on subactors to complete") - async with trio.open_nursery() as nursery: - for subactor, proc, main_q in self._children.values(): - nursery.start_soon(wait_for_actor, subactor, proc, main_q) - - await self.wait() - log.debug(f"Nursery teardown complete") - - -def current_actor() -> Actor: - """Get the process-local actor instance. - """ - return _current_actor - - -@asynccontextmanager -async def open_nursery(supervisor=None, loglevel='WARNING'): - """Create and yield a new ``ActorNursery``. - """ - actor = current_actor() - if not actor: - raise RuntimeError("No actor instance has been defined yet?") - - # TODO: figure out supervisors from erlang - async with ActorNursery(current_actor(), supervisor) as nursery: - yield nursery - - -class NoArbiterFound(Exception): - "Couldn't find the arbiter?" - - -async def start_actor(actor, host, port, arbiter_addr, nursery=None): - """Spawn a local actor by starting a task to execute it's main - async function. - - Blocks if no nursery is provided, in which case it is expected the nursery - provider is responsible for waiting on the task to complete. - """ - # assign process-local actor - global _current_actor - _current_actor = actor - - # start local channel-server and fake the portal API - # NOTE: this won't block since we provide the nursery - log.info(f"Starting local {actor} @ {host}:{port}") - - await actor._async_main( - accept_addr=(host, port), - parent_addr=None, - arbiter_addr=arbiter_addr, - nursery=nursery, - ) - # XXX: If spawned locally, the actor is cancelled when this - # context is complete given that there are no more active - # peer channels connected to it. - if not actor._outlive_main: - actor.cancel_server() - - # unset module state - _current_actor = None - log.info("Completed async main") - - -@asynccontextmanager -async def _connect_chan(host, port): - """Attempt to connect to an arbiter's channel server. - Return the channel on success or None on failure. - """ - chan = Channel((host, port)) - await chan.connect() - yield chan - await chan.aclose() - - -@asynccontextmanager -async def get_arbiter(host, port): - """Return a portal instance connected to a local or remote - arbiter. - """ - actor = current_actor() - if not actor: - raise RuntimeError("No actor instance has been defined yet?") - - if actor.is_arbiter: - # we're already the arbiter - # (likely a re-entrant call from the arbiter actor) - yield LocalPortal(actor) - else: - async with _connect_chan(host, port) as chan: - async with open_portal(chan) as arb_portal: - yield arb_portal - - -@asynccontextmanager -async def find_actor( - name, - arbiter_sockaddr=(_default_arbiter_host, _default_arbiter_port) -): - """Ask the arbiter to find actor(s) by name. - - Returns a sequence of unconnected portals for each matching actor - known to the arbiter (client code is expected to connect the portals). - """ - actor = current_actor() - if not actor: - raise RuntimeError("No actor instance has been defined yet?") - - async with get_arbiter(*arbiter_sockaddr) as arb_portal: - sockaddrs = await arb_portal.run('self', 'find_actor', name=name) - # TODO: return portals to all available actors - for now just - # the first one we find - if sockaddrs: - sockaddr = sockaddrs[-1] - async with _connect_chan(*sockaddr) as chan: - async with open_portal(chan) as portal: - yield portal - else: - yield - - -async def _main(async_fn, args, kwargs, name, arbiter_addr): - """Async entry point for ``tractor``. - """ - main = partial(async_fn, *args) if async_fn else None - arbiter_addr = (host, port) = arbiter_addr or ( - _default_arbiter_host, _default_arbiter_port) - # make a temporary connection to see if an arbiter exists - arbiter_found = False - try: - async with _connect_chan(host, port): - arbiter_found = True - except OSError: - log.warn(f"No actor could be found @ {host}:{port}") - - if arbiter_found: # we were able to connect to an arbiter - log.info(f"Arbiter seems to exist @ {host}:{port}") - # create a local actor and start up its main routine/task - actor = Actor( - name or 'anonymous', - main=main, - **kwargs - ) - host, port = (_default_arbiter_host, 0) - else: - # start this local actor as the arbiter - actor = Arbiter(name or 'arbiter', main=main, **kwargs) - - await start_actor(actor, host, port, arbiter_addr=arbiter_addr) - # Creates an internal nursery which shouldn't be cancelled even if - # the one opened below is (this is desirable because the arbiter should - # stay up until a re-election process has taken place - which is not - # implemented yet FYI). - - -def run(async_fn, *args, name=None, arbiter_addr=None, **kwargs): - """Run a trio-actor async function in process. - - This is tractor's main entry and the start point for any async actor. - """ - return trio.run(_main, async_fn, args, kwargs, name, arbiter_addr) diff --git a/piker/ui/watchlist.py b/piker/ui/watchlist.py index 21473c17..3bef11f1 100644 --- a/piker/ui/watchlist.py +++ b/piker/ui/watchlist.py @@ -371,7 +371,7 @@ async def update_quotes( grid.render_rows(cache) log.debug("Waiting on quotes") - log.warn("Server connection dropped") + log.warn("`brokerd` connection dropped") nursery.cancel_scope.cancel() @@ -391,80 +391,82 @@ async def _async_main(name, portal, tickers, brokermod, rate): "piker.brokers.core", 'start_quote_stream', broker=brokermod.name, tickers=tickers) - try: - async with trio.open_nursery() as nursery: - # get first quotes response - log.debug("Waiting on first quote...") - quotes = await agen.__anext__() - first_quotes = [ - brokermod.format_quote(quote, symbol_data=sd)[0] - for quote in quotes.values()] + async with trio.open_nursery() as nursery: + # get first quotes response + log.debug("Waiting on first quote...") + quotes = await agen.__anext__() + first_quotes = [ + brokermod.format_quote(quote, symbol_data=sd)[0] + for quote in quotes.values()] - if first_quotes[0].get('last') is None: - log.error("Broker API is down temporarily") - nursery.cancel_scope.cancel() - return + if first_quotes[0].get('last') is None: + log.error("Broker API is down temporarily") + nursery.cancel_scope.cancel() + return - # build out UI - Window.set_title(f"watchlist: {name}\t(press ? for help)") - Builder.load_string(_kv) - box = BoxLayout(orientation='vertical', padding=5, spacing=5) + # build out UI + Window.set_title(f"watchlist: {name}\t(press ? for help)") + Builder.load_string(_kv) + box = BoxLayout(orientation='vertical', padding=5, spacing=5) - # define bid-ask "stacked" cells - # (TODO: needs some rethinking and renaming for sure) - bidasks = brokermod._bidasks + # define bid-ask "stacked" cells + # (TODO: needs some rethinking and renaming for sure) + bidasks = brokermod._bidasks - # add header row - headers = first_quotes[0].keys() - header = Row( - {key: key for key in headers}, - headers=headers, - bidasks=bidasks, - is_header_row=True, - size_hint=(1, None), - ) - box.add_widget(header) + # add header row + headers = first_quotes[0].keys() + header = Row( + {key: key for key in headers}, + headers=headers, + bidasks=bidasks, + is_header_row=True, + size_hint=(1, None), + ) + box.add_widget(header) - # build grid - grid = TickerTable( - cols=1, - size_hint=(1, None), - ) - for ticker_record in first_quotes: - grid.append_row(ticker_record, bidasks=bidasks) - # associate the col headers row with the ticker table even though - # they're technically wrapped separately in containing BoxLayout - header.table = grid + # build grid + grid = TickerTable( + cols=1, + size_hint=(1, None), + ) + for ticker_record in first_quotes: + grid.append_row(ticker_record, bidasks=bidasks) + # associate the col headers row with the ticker table even though + # they're technically wrapped separately in containing BoxLayout + header.table = grid - # mark the initial sorted column header as bold and underlined - sort_cell = header.get_cell(grid.sort_key) - sort_cell.bold = sort_cell.underline = True - grid.last_clicked_col_cell = sort_cell + # mark the initial sorted column header as bold and underlined + sort_cell = header.get_cell(grid.sort_key) + sort_cell.bold = sort_cell.underline = True + grid.last_clicked_col_cell = sort_cell - # set up a pager view for large ticker lists - grid.bind(minimum_height=grid.setter('height')) - pager = PagerView(box, grid, nursery) - box.add_widget(pager) + # set up a pager view for large ticker lists + grid.bind(minimum_height=grid.setter('height')) + pager = PagerView(box, grid, nursery) + box.add_widget(pager) - widgets = { - # 'anchor': anchor, - 'root': box, - 'grid': grid, - 'box': box, - 'header': header, - 'pager': pager, - } - nursery.start_soon( - update_quotes, nursery, brokermod, widgets, agen, sd, quotes) + widgets = { + # 'anchor': anchor, + 'root': box, + 'grid': grid, + 'box': box, + 'header': header, + 'pager': pager, + } + nursery.start_soon( + update_quotes, nursery, brokermod, widgets, agen, sd, quotes) + try: # Trio-kivy entry point. await async_runTouchApp(widgets['root']) # run kivy await agen.aclose() # cancel aysnc gen call - finally: - # un-subscribe from symbols stream - await portal.run( - "piker.brokers.core", 'modify_quote_stream', - broker=brokermod.name, tickers=[]) + finally: + # un-subscribe from symbols stream (cancel if brokerd + # was already torn down - say by SIGINT) + with trio.move_on_after(0.2): + await portal.run( + "piker.brokers.core", 'modify_quote_stream', + broker=brokermod.name, tickers=[]) - # cancel GUI update task - nursery.cancel_scope.cancel() + # cancel GUI update task + nursery.cancel_scope.cancel() diff --git a/tests/test_tractor.py b/tests/test_tractor.py index 7200dfb0..3027fc9f 100644 --- a/tests/test_tractor.py +++ b/tests/test_tractor.py @@ -1,12 +1,8 @@ """ Actor model API testing """ -import time -from functools import partial - import pytest -import trio -from piker import tractor +import tractor @pytest.fixture @@ -14,78 +10,6 @@ def us_symbols(): return ['TSLA', 'AAPL', 'CGC', 'CRON'] -@pytest.mark.trio -async def test_no_arbitter(): - """An arbitter must be established before any nurseries - can be created. - - (In other words ``tractor.run`` must be used instead of ``trio.run`` as is - done by the ``pytest-trio`` plugin.) - """ - with pytest.raises(RuntimeError): - with tractor.open_nursery(): - pass - - -def test_local_actor_async_func(): - """Verify a simple async function in-process. - """ - nums = [] - - async def print_loop(): - # arbiter is started in-proc if dne - assert tractor.current_actor().is_arbiter - - for i in range(10): - nums.append(i) - await trio.sleep(0.1) - - start = time.time() - tractor.run(print_loop) - - # ensure the sleeps were actually awaited - assert time.time() - start >= 1 - assert nums == list(range(10)) - - -# NOTE: this func must be defined at module level in order for the -# interal pickling infra of the forkserver to work -async def spawn(is_arbiter): - statespace = {'doggy': 10, 'kitty': 4} - namespaces = ['piker.brokers.core'] - - await trio.sleep(0.1) - actor = tractor.current_actor() - assert actor.is_arbiter == is_arbiter - - # arbiter should always have an empty statespace as it's redundant - assert actor.statespace == statespace - - if actor.is_arbiter: - async with tractor.open_nursery() as nursery: - # forks here - portal = await nursery.start_actor( - 'sub-actor', - main=partial(spawn, False), - statespace=statespace, - rpc_module_paths=namespaces, - ) - assert len(nursery._children) == 1 - assert portal.channel.uid in tractor.current_actor()._peers - else: - return 10 - - -def test_local_arbiter_subactor_global_state(): - statespace = {'doggy': 10, 'kitty': 4} - tractor.run( - spawn, - True, - name='arbiter', - statespace=statespace, - ) - - async def rx_price_quotes_from_brokerd(us_symbols): """Verify we can spawn a daemon actor and retrieve streamed price data. """